This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a1f3dcb628eb [SPARK-52509][CORE] Cleanup individual shuffles from 
fallback storage on `RemoveShuffle` event
a1f3dcb628eb is described below

commit a1f3dcb628eb6848aefae261c29305f5e033c9df
Author: Enrico Minack <[email protected]>
AuthorDate: Fri Oct 24 22:07:49 2025 -0700

    [SPARK-52509][CORE] Cleanup individual shuffles from fallback storage on 
`RemoveShuffle` event
    
    ### What changes were proposed in this pull request?
    Shuffle data of individual shuffles are deleted from the fallback storage 
during regular shuffle cleanup.
    
    ### Why are the changes needed?
    Currently, the shuffle data are only removed from the fallback storage on 
Spark context shutdown. Long running Spark jobs accumulate shuffle data, though 
this data is not used by Spark any more. Those shuffles should be cleaned up 
while Spark context is running.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit tests and manual test via [reproduction 
example](https://gist.github.com/EnricoMi/e9daa1176bce4c1211af3f3c5848112a/3140527bcbedec51ed2c571885db774c880cb941).
    
    Run the reproduction example without the ` <<< "$scala"`. In the Spark 
shell, execute this code:
    
    ```scala
    import org.apache.spark.sql.SaveMode
    
    val n = 100000000
    val j = spark.sparkContext.broadcast(1000)
    val x = spark.range(0, n, 1, 100).select($"id".cast("int"))
    x.as[Int]
     .mapPartitions { it => if (it.hasNext && it.next < n / 100 * 80) 
Thread.sleep(2000); it }
     .groupBy($"value" % 1000).as[Int, Int]
     .flatMapSortedGroups($"value"){ case (m, it) => if (it.hasNext && it.next 
== 0) Thread.sleep(10000); it }
      .write.mode(SaveMode.Overwrite).csv("/tmp/spark.csv")
    ```
    This writes some data of shuffle 0 to the fallback storage.
    
    Invoking `System.gc()` removes that shuffle directory from the fallback 
storage. Exiting the Spark shell removes the whole application directory.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #51199 from EnricoMi/fallback-storage-cleanup.
    
    Authored-by: Enrico Minack <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../main/scala/org/apache/spark/SparkContext.scala |  3 +-
 .../org/apache/spark/internal/config/package.scala | 20 ++++++----
 .../org/apache/spark/storage/FallbackStorage.scala | 24 +++++++++---
 .../spark/storage/FallbackStorageSuite.scala       | 45 ++++++++++++++++++++--
 4 files changed, 73 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b0ac6d96a001..898bbad26b7e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -641,7 +641,8 @@ class SparkContext(config: SparkConf) extends Logging {
     }
     _ui.foreach(_.setAppId(_applicationId))
     _env.blockManager.initialize(_applicationId)
-    FallbackStorage.registerBlockManagerIfNeeded(_env.blockManager.master, 
_conf)
+    FallbackStorage.registerBlockManagerIfNeeded(
+      _env.blockManager.master, _conf, _hadoopConfiguration)
 
     // The metrics system for Driver need to be set spark.app.id to app ID.
     // So it should start after we get app ID from the task scheduler and set 
spark.app.id.
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 9a4ef1fbd7f6..9876848f654a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -604,23 +604,27 @@ package object config {
         "cache block replication should be positive.")
       .createWithDefaultString("30s")
 
+  private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP =
+    ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp")
+      .doc("If true, Spark cleans up its fallback storage data once individual 
shuffles are " +
+        "freed (interval configured via spark.cleaner.periodicGC.interval), 
and during " +
+        "shutting down.")
+      .version("3.2.0")
+      .booleanConf
+      .createWithDefault(false)
+
   private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH =
     ConfigBuilder("spark.storage.decommission.fallbackStorage.path")
       .doc("The location for fallback storage during block manager 
decommissioning. " +
         "For example, `s3a://spark-storage/`. In case of empty, fallback 
storage is disabled. " +
-        "The storage should be managed by TTL because Spark will not clean it 
up.")
+        "The storage will not be cleaned up by Spark unless " +
+        s"${STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP.key} is true. " +
+        "Use an external clean up mechanism when false, for instance a TTL.")
       .version("3.1.0")
       .stringConf
       .checkValue(_.endsWith(java.io.File.separator), "Path should end with 
separator.")
       .createOptional
 
-  private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP =
-    ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp")
-      .doc("If true, Spark cleans up its fallback storage data during shutting 
down.")
-      .version("3.2.0")
-      .booleanConf
-      .createWithDefault(false)
-
   private[spark] val STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE =
     ConfigBuilder("spark.storage.decommission.shuffleBlocks.maxDiskSize")
       .doc("Maximum disk space to use to store shuffle blocks before rejecting 
remote " +
diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala 
b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
index c086be0bfe80..19cdebd80ebf 100644
--- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
@@ -36,6 +36,7 @@ import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
 import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
 import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
+import org.apache.spark.storage.BlockManagerMessages.RemoveShuffle
 import org.apache.spark.util.Utils
 
 /**
@@ -95,7 +96,8 @@ private[storage] class FallbackStorage(conf: SparkConf) 
extends Logging {
   }
 }
 
-private[storage] class NoopRpcEndpointRef(conf: SparkConf) extends 
RpcEndpointRef(conf) {
+private[storage] class FallbackStorageRpcEndpointRef(conf: SparkConf, 
hadoopConf: Configuration)
+    extends RpcEndpointRef(conf) {
   // scalastyle:off executioncontextglobal
   import scala.concurrent.ExecutionContext.Implicits.global
   // scalastyle:on executioncontextglobal
@@ -103,6 +105,11 @@ private[storage] class NoopRpcEndpointRef(conf: SparkConf) 
extends RpcEndpointRe
   override def name: String = "fallback"
   override def send(message: Any): Unit = {}
   override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] 
= {
+    message match {
+      case RemoveShuffle(shuffleId) =>
+        FallbackStorage.cleanUp(conf, hadoopConf, Some(shuffleId))
+      case _ => // no-op
+    }
     Future{true.asInstanceOf[T]}
   }
 }
@@ -120,20 +127,25 @@ private[spark] object FallbackStorage extends Logging {
   }
 
   /** Register the fallback block manager and its RPC endpoint. */
-  def registerBlockManagerIfNeeded(master: BlockManagerMaster, conf: 
SparkConf): Unit = {
+  def registerBlockManagerIfNeeded(
+      master: BlockManagerMaster,
+      conf: SparkConf,
+      hadoopConf: Configuration): Unit = {
     if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
       master.registerBlockManager(
-        FALLBACK_BLOCK_MANAGER_ID, Array.empty[String], 0, 0, new 
NoopRpcEndpointRef(conf))
+        FALLBACK_BLOCK_MANAGER_ID, Array.empty[String], 0, 0,
+        new FallbackStorageRpcEndpointRef(conf, hadoopConf))
     }
   }
 
-  /** Clean up the generated fallback location for this app. */
-  def cleanUp(conf: SparkConf, hadoopConf: Configuration): Unit = {
+  /** Clean up the generated fallback location for this app (and shuffle id if 
given). */
+  def cleanUp(conf: SparkConf, hadoopConf: Configuration, shuffleId: 
Option[Int] = None): Unit = {
     if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined &&
         conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP) &&
         conf.contains("spark.app.id")) {
-      val fallbackPath =
+      val fallbackPath = shuffleId.foldLeft(
         new Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get, 
conf.getAppId)
+      ) { case (path, shuffleId) => new Path(path, shuffleId.toString) }
       val fallbackUri = fallbackPath.toUri
       val fallbackFileSystem = FileSystem.get(fallbackUri, hadoopConf)
       // The fallback directory for this app may not be created yet.
diff --git 
a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
index 6c51bd4ff2e2..6df8bc85b510 100644
--- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
@@ -30,6 +30,7 @@ import org.scalatest.concurrent.Eventually.{eventually, 
interval, timeout}
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite, TestUtils}
 import org.apache.spark.LocalSparkContext.withSpark
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.config._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.launcher.SparkLauncher.{EXECUTOR_MEMORY, SPARK_MASTER}
@@ -67,8 +68,10 @@ class FallbackStorageSuite extends SparkFunSuite with 
LocalSparkContext {
       .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
       .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
         Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
+    val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+    val rpcEndpointRef = new FallbackStorageRpcEndpointRef(conf, hadoopConf)
     val fallbackStorage = new FallbackStorage(conf)
-    val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, 
false)
+    val bmm = new BlockManagerMaster(rpcEndpointRef, null, conf, false)
 
     val bm = mock(classOf[BlockManager])
     val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = 
false)
@@ -118,8 +121,10 @@ class FallbackStorageSuite extends SparkFunSuite with 
LocalSparkContext {
       .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
       .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
         "file://" + Files.createTempDirectory("tmp").toFile.getAbsolutePath + 
"/")
+    val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+    val rpcEndpointRef = new FallbackStorageRpcEndpointRef(conf, hadoopConf)
     val fallbackStorage = new FallbackStorage(conf)
-    val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, 
false)
+    val bmm = new BlockManagerMaster(rpcEndpointRef, null, conf, false)
 
     val bm = mock(classOf[BlockManager])
     val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = 
false)
@@ -153,7 +158,7 @@ class FallbackStorageSuite extends SparkFunSuite with 
LocalSparkContext {
     assert(readResult.nioByteBuffer().array().sameElements(content))
   }
 
-  test("SPARK-34142: fallback storage API - cleanUp") {
+  test("SPARK-34142: fallback storage API - cleanUp app") {
     withTempDir { dir =>
       Seq(true, false).foreach { cleanUp =>
         val appId = s"test$cleanUp"
@@ -165,8 +170,38 @@ class FallbackStorageSuite extends SparkFunSuite with 
LocalSparkContext {
         val location = new File(dir, appId)
         assert(location.mkdir())
         assert(location.exists())
+        val shuffle = new File(location, "1")
+        assert(shuffle.mkdir())
+        assert(shuffle.exists())
         FallbackStorage.cleanUp(conf, new Configuration())
         assert(location.exists() != cleanUp)
+        assert(shuffle.exists() != cleanUp)
+      }
+    }
+  }
+
+  test("SPARK-34142: fallback storage API - cleanUp shuffle") {
+    withTempDir { dir =>
+      Seq(true, false).foreach { cleanUp =>
+        val appId = s"test$cleanUp"
+        val conf = new SparkConf(false)
+          .set("spark.app.id", appId)
+          .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, dir.getAbsolutePath 
+ "/")
+          .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, cleanUp)
+
+        val location = new File(dir, appId)
+        assert(location.mkdir())
+        assert(location.exists())
+        val shuffle1 = new File(location, "1")
+        assert(shuffle1.mkdir())
+        assert(shuffle1.exists())
+        val shuffle2 = new File(location, "2")
+        assert(shuffle2.mkdir())
+        assert(shuffle2.exists())
+        FallbackStorage.cleanUp(conf, new Configuration(), Some(1))
+        assert(location.exists())
+        assert(shuffle1.exists() != cleanUp)
+        assert(shuffle2.exists())
       }
     }
   }
@@ -177,6 +212,8 @@ class FallbackStorageSuite extends SparkFunSuite with 
LocalSparkContext {
       .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
       .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
         Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
+    val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+    val rpcEndpointRef = new FallbackStorageRpcEndpointRef(conf, hadoopConf)
 
     val ids = Set((1, 1L, 1))
     val bm = mock(classOf[BlockManager])
@@ -202,7 +239,7 @@ class FallbackStorageSuite extends SparkFunSuite with 
LocalSparkContext {
 
     when(bm.getPeers(mc.any()))
       .thenReturn(Seq(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID))
-    val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, 
false)
+    val bmm = new BlockManagerMaster(rpcEndpointRef, null, conf, false)
     when(bm.master).thenReturn(bmm)
     val blockTransferService = mock(classOf[BlockTransferService])
     when(blockTransferService.uploadBlockSync(mc.any(), mc.any(), mc.any(), 
mc.any(), mc.any(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to