This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 118167f444f0 [SPARK-48928] Log Warning for Calling .unpersist() on
Locally Checkpointed RDDs
118167f444f0 is described below
commit 118167f444f05db26e8b1a8b52dd741720ed2447
Author: Mingkang Li <[email protected]>
AuthorDate: Tue Jul 23 14:41:47 2024 -0500
[SPARK-48928] Log Warning for Calling .unpersist() on Locally Checkpointed
RDDs
### What changes were proposed in this pull request?
This pull request proposes logging a warning message when the
`.unpersist()` method is called on RDDs that have been locally checkpointed.
The goal is to inform users about the potential risks associated with
unpersisting locally checkpointed RDDs without changing the current behavior of
the method.
### Why are the changes needed?
Local checkpointing truncates the lineage of an RDD, preventing it from
being recomputed from its source. If a locally checkpointed RDD is unpersisted,
it loses its data and cannot be regenerated, potentially leading to job
failures if subsequent actions or transformations are attempted on the RDD
(which was seen on some user workloads). Logging a warning message helps users
avoid such pitfalls and aids in debugging.
### Does this PR introduce _any_ user-facing change?
Yes, this PR adds a warning log message when .unpersist() is called on a
locally checkpointed RDD, but it does not alter any existing behavior.
### How was this patch tested?
This PR does not change any existing behavior and therefore no tests are
added.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47391 from mingkangli-db/warning_unpersist.
Authored-by: Mingkang Li <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
core/src/main/scala/org/apache/spark/rdd/RDD.scala | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index ac93abf3fe7a..0db0133f632b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -211,6 +211,11 @@ abstract class RDD[T: ClassTag](
* @return This RDD.
*/
def unpersist(blocking: Boolean = false): this.type = {
+ if (isLocallyCheckpointed) {
+ // This means its lineage has been truncated and cannot be recomputed
once unpersisted.
+ logWarning(log"RDD ${MDC(RDD_ID, id)} was locally checkpointed, its
lineage has been" +
+ log" truncated and cannot be recomputed after unpersisting")
+ }
logInfo(log"Removing RDD ${MDC(RDD_ID, id)} from persistence list")
sc.unpersistRDD(id, blocking)
storageLevel = StorageLevel.NONE
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]