This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 47555da2ae2 [SPARK-44547][CORE] Ignore fallback storage for cached RDD 
migration
47555da2ae2 is described below

commit 47555da2ae292b07488ba181db1aceac8e7ddb3a
Author: Frank Yin <[email protected]>
AuthorDate: Thu Aug 24 22:19:41 2023 -0700

    [SPARK-44547][CORE] Ignore fallback storage for cached RDD migration
    
    ### What changes were proposed in this pull request?
    
    Fix bugs that makes the RDD decommissioner never finish
    
    ### Why are the changes needed?
    
    The cached RDD decommissioner is in a forever retry loop when the only 
viable peer is the fallback storage, which it doesn't know how to handle.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    ### How was this patch tested?
    
    Tests are added and tested using Spark jobs.
    
    Closes #42155 from ukby1234/franky.SPARK-44547.
    
    Authored-by: Frank Yin <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/storage/BlockManagerDecommissioner.scala |  4 +--
 .../BlockManagerDecommissionUnitSuite.scala        | 35 +++++++++++++++++++++-
 2 files changed, 36 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
index e871ad4192f..59d1f3b4c4b 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
@@ -187,7 +187,7 @@ private[storage] class BlockManagerDecommissioner(
 
   // Set if we encounter an error attempting to migrate and stop.
   @volatile private var stopped = false
-  @volatile private var stoppedRDD =
+  @volatile private[storage] var stoppedRDD =
     !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)
   @volatile private var stoppedShuffle =
     !conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)
@@ -207,7 +207,7 @@ private[storage] class BlockManagerDecommissioner(
       logInfo("Attempting to migrate all RDD blocks")
       while (!stopped && !stoppedRDD) {
         // Validate if we have peers to migrate to. Otherwise, give up 
migration.
-        if (bm.getPeers(false).isEmpty) {
+        if (!bm.getPeers(false).exists(_ != 
FallbackStorage.FALLBACK_BLOCK_MANAGER_ID)) {
           logWarning("No available peers to receive RDD blocks, stop 
migration.")
           stoppedRDD = true
         } else {
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
index df4f256afb6..67a4514d5bd 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
@@ -23,7 +23,7 @@ import scala.concurrent.Future
 import scala.concurrent.duration._
 
 import org.mockito.{ArgumentMatchers => mc}
-import org.mockito.Mockito.{atLeast => least, mock, times, verify, when}
+import org.mockito.Mockito.{atLeast => least, mock, never, times, verify, when}
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.matchers.must.Matchers
 
@@ -350,4 +350,37 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
         bmDecomManager.stop()
     }
   }
+
+  test("SPARK-44547: test cached rdd migration no available hosts") {
+    val blockTransferService = mock(classOf[BlockTransferService])
+    val bm = mock(classOf[BlockManager])
+
+    val storedBlockId1 = RDDBlockId(0, 0)
+    val storedBlock1 =
+      new ReplicateBlock(storedBlockId1, Seq(BlockManagerId("replicaHolder", 
"host1", bmPort)), 1)
+
+    val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+    registerShuffleBlocks(migratableShuffleBlockResolver, Set())
+    when(bm.getPeers(mc.any()))
+      .thenReturn(Seq(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID))
+
+    when(bm.blockTransferService).thenReturn(blockTransferService)
+    when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+    when(bm.getMigratableRDDBlocks())
+      .thenReturn(Seq(storedBlock1))
+
+    val bmDecomManager = new BlockManagerDecommissioner(sparkConf, bm)
+
+    try {
+      bmDecomManager.start()
+      eventually(timeout(100.second), interval(10.milliseconds)) {
+        verify(bm, never()).replicateBlock(
+          mc.eq(storedBlockId1), mc.any(), mc.any(), mc.eq(Some(3)))
+        assert(bmDecomManager.rddBlocksLeft)
+        assert(bmDecomManager.stoppedRDD)
+      }
+    } finally {
+      bmDecomManager.stop()
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to