This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 6468f96ea42 [SPARK-45386][SQL][3.5] Fix correctness issue with persist
using StorageLevel.NONE on Dataset
6468f96ea42 is described below
commit 6468f96ea42f6efe42033507c4e26600b751bfcc
Author: Emil Ejbyfeldt <[email protected]>
AuthorDate: Thu Oct 5 09:41:08 2023 +0900
[SPARK-45386][SQL][3.5] Fix correctness issue with persist using
StorageLevel.NONE on Dataset
### What changes were proposed in this pull request?
Support for InMememoryTableScanExec in AQE was added in #39624, but this
patch contained a bug when a Dataset is persisted using `StorageLevel.NONE`.
Before that patch a query like:
```
import org.apache.spark.storage.StorageLevel
spark.createDataset(Seq(1, 2)).persist(StorageLevel.NONE).count()
```
would correctly return 2. But after that patch it incorrectly returns 0.
This is because AQE incorrectly determines based on the runtime statistics that
are collected here:
https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala#L294
that the input is empty. The problem is that the action that should make
sure the statistics are collected here
https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L285-L291
never use the iterator and when we have `StorageLevel.NONE` the persisting
will also not use the iterator and we will not gather the correct statistics.
The proposed fix in the patch just make calling persist with
StorageLevel.NONE a no-op. Changing the action since it always "emptied" the
iterator would also work but seems like that would be unnecessary work in a lot
of normal circumstances.
### Why are the changes needed?
The current code has a correctness issue.
### Does this PR introduce _any_ user-facing change?
Yes, fixes the correctness issue.
### How was this patch tested?
New and existing unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43213 from eejbyfeldt/SPARK-45386-branch-3.5.
Authored-by: Emil Ejbyfeldt <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../main/scala/org/apache/spark/sql/execution/CacheManager.scala | 4 +++-
sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 6 ++++++
2 files changed, 9 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 064819275e0..e906c74f8a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -113,7 +113,9 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
planToCache: LogicalPlan,
tableName: Option[String],
storageLevel: StorageLevel): Unit = {
- if (lookupCachedData(planToCache).nonEmpty) {
+ if (storageLevel == StorageLevel.NONE) {
+ // Do nothing for StorageLevel.NONE since it will not actually cache any
data.
+ } else if (lookupCachedData(planToCache).nonEmpty) {
logWarning("Asked to cache already cached data.")
} else {
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index c967540541a..6d9c43f866a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -45,6 +45,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
+import org.apache.spark.storage.StorageLevel
case class TestDataPoint(x: Int, y: Double, s: String, t: TestDataPoint2)
case class TestDataPoint2(x: Int, s: String)
@@ -2535,6 +2536,11 @@ class DatasetSuite extends QueryTest
checkDataset(ds.filter(f(col("_1"))), Tuple1(ValueClass(2)))
}
+
+ test("SPARK-45386: persist with StorageLevel.NONE should give correct
count") {
+ val ds = Seq(1, 2).toDS().persist(StorageLevel.NONE)
+ assert(ds.count() == 2)
+ }
}
class DatasetLargeResultCollectingSuite extends QueryTest
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]