This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dcdcb80c536 [SPARK-42134][SQL] Fix getPartitionFiltersAndDataFilters()
to handle filters without referenced attributes
dcdcb80c536 is described below
commit dcdcb80c53681d1daff416c007cf8a2810155625
Author: Peter Toth <[email protected]>
AuthorDate: Fri Jan 20 18:35:33 2023 -0800
[SPARK-42134][SQL] Fix getPartitionFiltersAndDataFilters() to handle
filters without referenced attributes
### What changes were proposed in this pull request?
This is a small correctness fix to
`DataSourceUtils.getPartitionFiltersAndDataFilters()` to handle filters without
any referenced attributes correctly. E.g. without the fix the following query
on ParquetV2 source:
```
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.range(1).write.mode("overwrite").format("parquet").save(path)
df = spark.read.parquet(path).toDF("i")
f = udf(lambda x: False, "boolean")(lit(1))
val r = df.filter(f)
r.show()
```
returns
```
+---+
| i|
+---+
| 0|
+---+
```
but it should return with empty results.
The root cause of the issue is that during `V2ScanRelationPushDown` a
filter that doesn't reference any column is incorrectly identified as partition
filter.
### Why are the changes needed?
To fix a correctness issue.
### Does this PR introduce _any_ user-facing change?
Yes, fixes a correctness issue.
### How was this patch tested?
Added new UT.
Closes #39676 from
peter-toth/SPARK-42134-fix-getpartitionfiltersanddatafilters.
Authored-by: Peter Toth <[email protected]>
Signed-off-by: huaxingao <[email protected]>
---
python/pyspark/sql/tests/test_udf.py | 18 ++++++++++++++++++
.../sql/execution/datasources/DataSourceUtils.scala | 2 +-
2 files changed, 19 insertions(+), 1 deletion(-)
diff --git a/python/pyspark/sql/tests/test_udf.py
b/python/pyspark/sql/tests/test_udf.py
index 79fee7a48e5..1a2ec213ca6 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -681,6 +681,24 @@ class BaseUDFTests(object):
finally:
shutil.rmtree(path)
+ # SPARK-42134
+ def test_file_dsv2_with_udf_filter(self):
+ from pyspark.sql.functions import lit
+
+ path = tempfile.mkdtemp()
+ shutil.rmtree(path)
+
+ try:
+ with self.sql_conf({"spark.sql.sources.useV1SourceList": ""}):
+
self.spark.range(1).write.mode("overwrite").format("parquet").save(path)
+ df = self.spark.read.parquet(path).toDF("i")
+ f = udf(lambda x: False, "boolean")(lit(1))
+ result = df.filter(f)
+ self.assertEqual(0, result.count())
+
+ finally:
+ shutil.rmtree(path)
+
# SPARK-25591
def test_same_accumulator_in_udfs(self):
data_schema = StructType(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index 26f22670a51..5eb422f80e2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -273,7 +273,7 @@ object DataSourceUtils extends PredicateHelper {
}
val partitionSet = AttributeSet(partitionColumns)
val (partitionFilters, dataFilters) = normalizedFilters.partition(f =>
- f.references.subsetOf(partitionSet)
+ f.references.nonEmpty && f.references.subsetOf(partitionSet)
)
val extraPartitionFilter =
dataFilters.flatMap(extractPredicatesWithinOutputSet(_, partitionSet))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]