This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8c11804c440 [SPARK-45652][SQL] SPJ: Handle empty input partitions
after dynamic filtering
8c11804c440 is described below
commit 8c11804c44066c059036cac8365f997fceb2c8e6
Author: Chao Sun <[email protected]>
AuthorDate: Thu Oct 26 08:52:10 2023 -0700
[SPARK-45652][SQL] SPJ: Handle empty input partitions after dynamic
filtering
### What changes were proposed in this pull request?
Handle the case when input partitions become empty after V2 dynamic
filtering, when SPJ is enabled.
### Why are the changes needed?
Current in the situation when all input partitions are filtered out via
dynamic filtering, SPJ doesn't work but instead will panic:
```
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:529)
at scala.None$.get(Option.scala:527)
at
org.apache.spark.sql.execution.datasources.v2.BatchScanExec.filteredPartitions$lzycompute(BatchScanExec.scala:108)
at
org.apache.spark.sql.execution.datasources.v2.BatchScanExec.filteredPartitions(BatchScanExec.scala:65)
at
org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD$lzycompute(BatchScanExec.scala:136)
at
org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD(BatchScanExec.scala:135)
at
org.apache.spark.sql.boson.BosonBatchScanExec.inputRDD$lzycompute(BosonBatchScanExec.scala:28)
```
This is because the `groupPartitions` method will return `None` in this
scenario. We should handle the case.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added a test case for this.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43531 from sunchao/SPARK-45652.
Authored-by: Chao Sun <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
---
.../execution/datasources/v2/BatchScanExec.scala | 2 +-
.../connector/KeyGroupedPartitioningSuite.scala | 42 ++++++++++++++++++++++
2 files changed, 43 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
index 094a7b20808..afcc762e636 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
@@ -100,7 +100,7 @@ case class BatchScanExec(
"partition values that are not present in the original
partitioning.")
}
- groupPartitions(newPartitions).get.groupedParts.map(_.parts)
+
groupPartitions(newPartitions).map(_.groupedParts.map(_.parts)).getOrElse(Seq.empty)
case _ =>
// no validation is needed as the data source did not report any
specific partitioning
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index 4cb5457b66b..e6448d4d80f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -1477,4 +1477,46 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
}
}
}
+
+ test("SPARK-45652: SPJ should handle empty partition after dynamic
filtering") {
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+ SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+ SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "10") {
+ val items_partitions = Array(identity("id"))
+ createTable(items, items_schema, items_partitions)
+ sql(s"INSERT INTO testcat.ns.$items VALUES " +
+ s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
+ s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
+ s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
+ s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
+ s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
+
+ val purchases_partitions = Array(identity("item_id"))
+ createTable(purchases, purchases_schema, purchases_partitions)
+ sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
+ s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
+ s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
+ s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
+ s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
+ s"(3, 19.5, cast('2020-02-01' as timestamp))")
+
+ Seq(true, false).foreach { pushDownValues =>
+ Seq(true, false).foreach { partiallyClustered => {
+ withSQLConf(
+ SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key
->
+ partiallyClustered.toString,
+ SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key ->
pushDownValues.toString) {
+ // The dynamic filtering effectively filtered out all the
partitions
+ val df = sql(s"SELECT p.price from testcat.ns.$items i,
testcat.ns.$purchases p " +
+ "WHERE i.id = p.item_id AND i.price > 50.0")
+ checkAnswer(df, Seq.empty)
+ }
+ }
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]