This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 32fc10c35c4 [SPARK-41886][CONNECT][PYTHON] `DataFrame.intersect`
doctest output has different order
32fc10c35c4 is described below
commit 32fc10c35c48f96cce1d1d713c148572755a01e6
Author: Jiaan Geng <[email protected]>
AuthorDate: Tue Jan 10 22:47:08 2023 +0800
[SPARK-41886][CONNECT][PYTHON] `DataFrame.intersect` doctest output has
different order
### What changes were proposed in this pull request?
File
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/dataframe.py", line
609, in pyspark.sql.connect.dataframe.DataFrame.intersect
Failed example:
`df1.intersect(df2).show()`
Expected:
```
+---+---+
| C1| C2|
+---+---+
| b| 3|
| a| 1|
+---+---+
```
Got:
```
+---+---+
| C1| C2|
+---+---+
| a| 1|
| b| 3|
+---+---+
<BLANKLINE>
```
After my investigation, the root cause is the different plan from pyspark
to connect.
The plan come from pyspark show below.
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[C1#2603, C2#2604L], functions=[])
+- Exchange hashpartitioning(C1#2603, C2#2604L, 200),
ENSURE_REQUIREMENTS, [plan_id=11257]
+- HashAggregate(keys=[C1#2603, C2#2604L], functions=[])
+- SortMergeJoin [coalesce(C1#2603, ), isnull(C1#2603),
coalesce(C2#2604L, 0), isnull(C2#2604L)], [coalesce(C1#2607, ),
isnull(C1#2607), coalesce(C2#2608L, 0), isnull(C2#2608L)], LeftSemi
:- Sort [coalesce(C1#2603, ) ASC NULLS FIRST, isnull(C1#2603)
ASC NULLS FIRST, coalesce(C2#2604L, 0) ASC NULLS FIRST, isnull(C2#2604L) ASC
NULLS FIRST], false, 0
: +- Exchange hashpartitioning(coalesce(C1#2603, ),
isnull(C1#2603), coalesce(C2#2604L, 0), isnull(C2#2604L), 200),
ENSURE_REQUIREMENTS, [plan_id=11250]
: +- Scan ExistingRDD[C1#2603,C2#2604L]
+- Sort [coalesce(C1#2607, ) ASC NULLS FIRST, isnull(C1#2607)
ASC NULLS FIRST, coalesce(C2#2608L, 0) ASC NULLS FIRST, isnull(C2#2608L) ASC
NULLS FIRST], false, 0
+- Exchange hashpartitioning(coalesce(C1#2607, ),
isnull(C1#2607), coalesce(C2#2608L, 0), isnull(C2#2608L), 200),
ENSURE_REQUIREMENTS, [plan_id=11251]
+- Scan ExistingRDD[C1#2607,C2#2608L]
<BLANKLINE>
<BLANKLINE>
```
The plan come from connect show below.
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[C1#1988, C2#1989L], functions=[])
+- Exchange hashpartitioning(C1#1988, C2#1989L, 200),
ENSURE_REQUIREMENTS, [plan_id=1550]
+- HashAggregate(keys=[C1#1988, C2#1989L], functions=[])
+- BroadcastHashJoin [coalesce(C1#1988, ), isnull(C1#1988),
coalesce(C2#1989L, 0), isnull(C2#1989L)], [coalesce(C1#1996, ),
isnull(C1#1996), coalesce(C2#1997L, 0), isnull(C2#1997L)], LeftSemi,
BuildRight, false
:- LocalTableScan [C1#1988, C2#1989L]
+- BroadcastExchange
HashedRelationBroadcastMode(List(coalesce(input[0, string, true], ),
isnull(input[0, string, true]), coalesce(input[1, bigint, true], 0),
isnull(input[1, bigint, true])),false), [plan_id=1546]
+- LocalTableScan [C1#1996, C2#1997L]
```
So the shuffle impacts the order of output.
### Why are the changes needed?
This PR adds `sort` operator to the example to test `DataFrame.intersect`
so that the output has determined order.
### Does this PR introduce _any_ user-facing change?
'No'.
New feature.
### How was this patch tested?
Manual test and doc test.
Closes #39483 from beliefer/SPARK-41886.
Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/connect/dataframe.py | 3 ---
python/pyspark/sql/dataframe.py | 2 +-
2 files changed, 1 insertion(+), 4 deletions(-)
diff --git a/python/pyspark/sql/connect/dataframe.py
b/python/pyspark/sql/connect/dataframe.py
index b81244dc113..c9cd65b93b7 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -1632,9 +1632,6 @@ def _test() -> None:
del pyspark.sql.connect.dataframe.DataFrame.drop.__doc__
del pyspark.sql.connect.dataframe.DataFrame.join.__doc__
- # TODO(SPARK-41886): The doctest output has different order
- del pyspark.sql.connect.dataframe.DataFrame.intersect.__doc__
-
# TODO(SPARK-41625): Support Structured Streaming
del pyspark.sql.connect.dataframe.DataFrame.isStreaming.__doc__
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index a437400fb90..09a5e9d0b07 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -3657,7 +3657,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
--------
>>> df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c",
4)], ["C1", "C2"])
>>> df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1",
"C2"])
- >>> df1.intersect(df2).show()
+ >>> df1.intersect(df2).sort(df1.C1.desc()).show()
+---+---+
| C1| C2|
+---+---+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]