This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 118167f444f0 [SPARK-48928] Log Warning for Calling .unpersist() on 
Locally Checkpointed RDDs
118167f444f0 is described below

commit 118167f444f05db26e8b1a8b52dd741720ed2447
Author: Mingkang Li <[email protected]>
AuthorDate: Tue Jul 23 14:41:47 2024 -0500

    [SPARK-48928] Log Warning for Calling .unpersist() on Locally Checkpointed 
RDDs
    
    ### What changes were proposed in this pull request?
    
    This pull request proposes logging a warning message when the 
`.unpersist()` method is called on RDDs that have been locally checkpointed. 
The goal is to inform users about the potential risks associated with 
unpersisting locally checkpointed RDDs without changing the current behavior of 
the method.
    
    ### Why are the changes needed?
    
    Local checkpointing truncates the lineage of an RDD, preventing it from 
being recomputed from its source. If a locally checkpointed RDD is unpersisted, 
it loses its data and cannot be regenerated, potentially leading to job 
failures if subsequent actions or transformations are attempted on the RDD 
(which was seen on some user workloads). Logging a warning message helps users 
avoid such pitfalls and aids in debugging.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this PR adds a warning log message when .unpersist() is called on a 
locally checkpointed RDD, but it does not alter any existing behavior.
    
    ### How was this patch tested?
    
    This PR does not change any existing behavior and therefore no tests are 
added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #47391 from mingkangli-db/warning_unpersist.
    
    Authored-by: Mingkang Li <[email protected]>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index ac93abf3fe7a..0db0133f632b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -211,6 +211,11 @@ abstract class RDD[T: ClassTag](
    * @return This RDD.
    */
   def unpersist(blocking: Boolean = false): this.type = {
+    if (isLocallyCheckpointed) {
+      // This means its lineage has been truncated and cannot be recomputed 
once unpersisted.
+      logWarning(log"RDD ${MDC(RDD_ID, id)} was locally checkpointed, its 
lineage has been" +
+        log" truncated and cannot be recomputed after unpersisting")
+    }
     logInfo(log"Removing RDD ${MDC(RDD_ID, id)} from persistence list")
     sc.unpersistRDD(id, blocking)
     storageLevel = StorageLevel.NONE


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to