This is an automated email from the ASF dual-hosted git repository.
zhengruifeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f3f567775a62 [SPARK-57247][SQL][CONNECT] Support DataFrame.zip in
Spark Connect
f3f567775a62 is described below
commit f3f567775a62f9ca3119fc12de01ab212d25dff7
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Sat Jun 6 09:39:44 2026 +0800
[SPARK-57247][SQL][CONNECT] Support DataFrame.zip in Spark Connect
### What changes were proposed in this pull request?
This is the follow-up to #54976 ([SPARK-55886]) which implemented
`DataFrame.zip` for the classic path and deferred Spark Connect support. This
PR wires up the Connect path end-to-end.
- **Protocol (`relations.proto`)**: adds a `Zip` message with `left` and
`right` `Relation` fields (field 48 in the `Relation` oneof). Python stubs
regenerated via the `connect-gen-protos` Docker image (buf 1.66.1 + mypy 1.19.1
+ mypy-protobuf 3.3.0 + ruff 0.14.8).
- **Server (`SparkConnectPlanner`)**: adds `transformZip` that directly
constructs the unresolved `logical.Zip(left, right)` plan, dispatched via
`RelTypeCase.ZIP`. `ResolveZip` then runs during analysis, same as the classic
path.
- **Scala Connect `Dataset`**: replaces the `UnsupportedOperationException`
stub with `sparkSession.newDataFrame { builder =>
builder.getZipBuilder.setLeft(...).setRight(...) }`, following the
`crossJoin`/`buildJoin` pattern.
- **Python Connect `plan.py`**: adds `class Zip(LogicalPlan)` following the
`NearestByJoin` pattern.
- **Python Connect `dataframe.py`**: replaces the
`PySparkNotImplementedError` stub with a `plan.Zip` call; removes the doctest
suppression (`del DataFrame.zip.__doc__`) that was added when Connect was
unsupported.
### Why are the changes needed?
`DataFrame.zip` was merged (#54976) with Connect deferred. This PR
completes the implementation so Connect users can use `zip` on equal footing
with the classic path.
### Does this PR introduce _any_ user-facing change?
Yes. `DataFrame.zip` now works on the Spark Connect path. Previously it
raised `PySparkNotImplementedError: [NOT_IMPLEMENTED] zip is not implemented.`
### How was this patch tested?
- `test_parity_zip.py`: runs the full `DataFrameZipTestsMixin` (basic
projections, expressions, one-sided base, `withColumn`, chained `withColumn`,
longer chains, parent-with-chained-child, `withColumnRenamed`, scalar Python
UDF, pandas UDF, and two error cases) against a Connect session.
- `test_connect_plan.py`: asserts that the proto plan for `left.zip(right)`
has the `zip` field set with the expected left/right sources.
- `PlanGenerationTestSuite`: serializes a `zip` plan to proto and compares
against a new golden file (`zip.proto.bin`).
- `ProtoToParsedPlanTestSuite`: deserializes the proto golden file, runs it
through `SparkConnectPlanner` + `Analyzer`, and compares the explained plan
against `zip.explain`.
- `DataFrameSuite` (Connect): end-to-end test that zips two projections
over a Connect session and asserts the resulting columns and values.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code
Closes #56300 from zhengruifeng/spark-dev-2-df-zip-connect-dev2.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/connect/dataframe.py | 11 +-
python/pyspark/sql/connect/plan.py | 34 ++
python/pyspark/sql/connect/proto/relations_pb2.py | 354 +++++++++++----------
python/pyspark/sql/connect/proto/relations_pb2.pyi | 41 +++
.../pyspark/sql/tests/connect/test_connect_plan.py | 14 +
.../pyspark/sql/tests/connect/test_parity_zip.py | 14 +-
.../apache/spark/sql/PlanGenerationTestSuite.scala | 4 +
.../apache/spark/sql/connect/DataFrameSuite.scala | 15 +
.../main/protobuf/spark/connect/relations.proto | 13 +
.../org/apache/spark/sql/connect/Dataset.scala | 7 +-
.../query-tests/explain-results/zip.explain | 2 +
.../test/resources/query-tests/queries/zip.json | 81 +++++
.../resources/query-tests/queries/zip.proto.bin | Bin 0 -> 495 bytes
.../sql/connect/planner/SparkConnectPlanner.scala | 6 +
14 files changed, 401 insertions(+), 195 deletions(-)
diff --git a/python/pyspark/sql/connect/dataframe.py
b/python/pyspark/sql/connect/dataframe.py
index 0c8e8d9fe9ba..7551210036c9 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -380,9 +380,10 @@ class DataFrame(ParentDataFrame):
)
def zip(self, other: ParentDataFrame) -> ParentDataFrame:
- raise PySparkNotImplementedError(
- errorClass="NOT_IMPLEMENTED",
- messageParameters={"feature": "zip"},
+ other = self._check_same_session(other)
+ return DataFrame(
+ plan.Zip(self._plan, other._plan),
+ session=self._session,
)
def _check_same_session(self, other: ParentDataFrame) -> "DataFrame":
@@ -2515,10 +2516,6 @@ def _test() -> None:
globs = pyspark.sql.dataframe.__dict__.copy()
- # `zip` is not yet supported on Spark Connect; the parent docstring's
- # example would call into the connect impl and fail with NOT_IMPLEMENTED.
- del pyspark.sql.dataframe.DataFrame.zip.__doc__
-
if not is_remote_only():
del pyspark.sql.dataframe.DataFrame.toJSON.__doc__
del pyspark.sql.dataframe.DataFrame.rdd.__doc__
diff --git a/python/pyspark/sql/connect/plan.py
b/python/pyspark/sql/connect/plan.py
index 540d81ffc690..51f117516663 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -1447,6 +1447,40 @@ class NearestByJoin(LogicalPlan):
"""
+class Zip(LogicalPlan):
+ def __init__(self, left: Optional[LogicalPlan], right: LogicalPlan) ->
None:
+ super().__init__(left)
+ self.left = cast(LogicalPlan, left)
+ self.right = right
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = self._create_proto_relation()
+ plan.zip.left.CopyFrom(self.left.plan(session))
+ plan.zip.right.CopyFrom(self.right.plan(session))
+ return self._with_relations(plan, session)
+
+ @property
+ def observations(self) -> Dict[str, "Observation"]:
+ return {**super().observations, **self.right.observations}
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ o = " " * (indent + LogicalPlan.INDENT)
+ n = indent + LogicalPlan.INDENT * 2
+ return
f"{i}<Zip>\n{o}left=\n{self.left.print(n)}\n{o}right=\n{self.right.print(n)}"
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>Zip</b><br />
+ Left: {self.left._repr_html_()}
+ Right: {self.right._repr_html_()}
+ </li>
+ </ul>
+ """
+
+
class SetOperation(LogicalPlan):
def __init__(
self,
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py
b/python/pyspark/sql/connect/proto/relations_pb2.py
index f63b61fc344e..8b56455e69f2 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.py
+++ b/python/pyspark/sql/connect/proto/relations_pb2.py
@@ -44,7 +44,7 @@ from pyspark.sql.connect.proto import ml_common_pb2 as
spark_dot_connect_dot_ml_
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/ml_common.proto"\xa1\x1f\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03
\x [...]
+
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/ml_common.proto"\xc9\x1f\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03
\x [...]
)
_globals = globals()
@@ -82,179 +82,181 @@ if not _descriptor._USE_C_DESCRIPTORS:
_globals["_PARSE_OPTIONSENTRY"]._loaded_options = None
_globals["_PARSE_OPTIONSENTRY"]._serialized_options = b"8\001"
_globals["_RELATION"]._serialized_start = 224
- _globals["_RELATION"]._serialized_end = 4225
- _globals["_MLRELATION"]._serialized_start = 4228
- _globals["_MLRELATION"]._serialized_end = 4712
- _globals["_MLRELATION_TRANSFORM"]._serialized_start = 4440
- _globals["_MLRELATION_TRANSFORM"]._serialized_end = 4675
- _globals["_FETCH"]._serialized_start = 4715
- _globals["_FETCH"]._serialized_end = 5046
- _globals["_FETCH_METHOD"]._serialized_start = 4831
- _globals["_FETCH_METHOD"]._serialized_end = 5046
- _globals["_FETCH_METHOD_ARGS"]._serialized_start = 4919
- _globals["_FETCH_METHOD_ARGS"]._serialized_end = 5046
- _globals["_UNKNOWN"]._serialized_start = 5048
- _globals["_UNKNOWN"]._serialized_end = 5057
- _globals["_RELATIONCOMMON"]._serialized_start = 5060
- _globals["_RELATIONCOMMON"]._serialized_end = 5202
- _globals["_SQL"]._serialized_start = 5205
- _globals["_SQL"]._serialized_end = 5683
- _globals["_SQL_ARGSENTRY"]._serialized_start = 5499
- _globals["_SQL_ARGSENTRY"]._serialized_end = 5589
- _globals["_SQL_NAMEDARGUMENTSENTRY"]._serialized_start = 5591
- _globals["_SQL_NAMEDARGUMENTSENTRY"]._serialized_end = 5683
- _globals["_WITHRELATIONS"]._serialized_start = 5685
- _globals["_WITHRELATIONS"]._serialized_end = 5802
- _globals["_READ"]._serialized_start = 5805
- _globals["_READ"]._serialized_end = 6522
- _globals["_READ_NAMEDTABLE"]._serialized_start = 5983
- _globals["_READ_NAMEDTABLE"]._serialized_end = 6175
- _globals["_READ_NAMEDTABLE_OPTIONSENTRY"]._serialized_start = 6117
- _globals["_READ_NAMEDTABLE_OPTIONSENTRY"]._serialized_end = 6175
- _globals["_READ_DATASOURCE"]._serialized_start = 6178
- _globals["_READ_DATASOURCE"]._serialized_end = 6509
- _globals["_READ_DATASOURCE_OPTIONSENTRY"]._serialized_start = 6117
- _globals["_READ_DATASOURCE_OPTIONSENTRY"]._serialized_end = 6175
- _globals["_RELATIONCHANGES"]._serialized_start = 6525
- _globals["_RELATIONCHANGES"]._serialized_end = 6757
- _globals["_RELATIONCHANGES_OPTIONSENTRY"]._serialized_start = 6117
- _globals["_RELATIONCHANGES_OPTIONSENTRY"]._serialized_end = 6175
- _globals["_PROJECT"]._serialized_start = 6759
- _globals["_PROJECT"]._serialized_end = 6876
- _globals["_FILTER"]._serialized_start = 6878
- _globals["_FILTER"]._serialized_end = 6990
- _globals["_JOIN"]._serialized_start = 6993
- _globals["_JOIN"]._serialized_end = 7654
- _globals["_JOIN_JOINDATATYPE"]._serialized_start = 7332
- _globals["_JOIN_JOINDATATYPE"]._serialized_end = 7424
- _globals["_JOIN_JOINTYPE"]._serialized_start = 7427
- _globals["_JOIN_JOINTYPE"]._serialized_end = 7635
- _globals["_SETOPERATION"]._serialized_start = 7657
- _globals["_SETOPERATION"]._serialized_end = 8136
- _globals["_SETOPERATION_SETOPTYPE"]._serialized_start = 7973
- _globals["_SETOPERATION_SETOPTYPE"]._serialized_end = 8087
- _globals["_LIMIT"]._serialized_start = 8138
- _globals["_LIMIT"]._serialized_end = 8214
- _globals["_OFFSET"]._serialized_start = 8216
- _globals["_OFFSET"]._serialized_end = 8295
- _globals["_TAIL"]._serialized_start = 8297
- _globals["_TAIL"]._serialized_end = 8372
- _globals["_AGGREGATE"]._serialized_start = 8375
- _globals["_AGGREGATE"]._serialized_end = 9141
- _globals["_AGGREGATE_PIVOT"]._serialized_start = 8790
- _globals["_AGGREGATE_PIVOT"]._serialized_end = 8901
- _globals["_AGGREGATE_GROUPINGSETS"]._serialized_start = 8903
- _globals["_AGGREGATE_GROUPINGSETS"]._serialized_end = 8979
- _globals["_AGGREGATE_GROUPTYPE"]._serialized_start = 8982
- _globals["_AGGREGATE_GROUPTYPE"]._serialized_end = 9141
- _globals["_SORT"]._serialized_start = 9144
- _globals["_SORT"]._serialized_end = 9304
- _globals["_DROP"]._serialized_start = 9307
- _globals["_DROP"]._serialized_end = 9448
- _globals["_DEDUPLICATE"]._serialized_start = 9451
- _globals["_DEDUPLICATE"]._serialized_end = 9691
- _globals["_LOCALRELATION"]._serialized_start = 9693
- _globals["_LOCALRELATION"]._serialized_end = 9782
- _globals["_CACHEDLOCALRELATION"]._serialized_start = 9784
- _globals["_CACHEDLOCALRELATION"]._serialized_end = 9856
- _globals["_CHUNKEDCACHEDLOCALRELATION"]._serialized_start = 9858
- _globals["_CHUNKEDCACHEDLOCALRELATION"]._serialized_end = 9970
- _globals["_CACHEDREMOTERELATION"]._serialized_start = 9972
- _globals["_CACHEDREMOTERELATION"]._serialized_end = 10027
- _globals["_SAMPLE"]._serialized_start = 10030
- _globals["_SAMPLE"]._serialized_end = 10303
- _globals["_RANGE"]._serialized_start = 10306
- _globals["_RANGE"]._serialized_end = 10451
- _globals["_SUBQUERYALIAS"]._serialized_start = 10453
- _globals["_SUBQUERYALIAS"]._serialized_end = 10567
- _globals["_REPARTITION"]._serialized_start = 10570
- _globals["_REPARTITION"]._serialized_end = 10712
- _globals["_SHOWSTRING"]._serialized_start = 10715
- _globals["_SHOWSTRING"]._serialized_end = 10857
- _globals["_HTMLSTRING"]._serialized_start = 10859
- _globals["_HTMLSTRING"]._serialized_end = 10973
- _globals["_STATSUMMARY"]._serialized_start = 10975
- _globals["_STATSUMMARY"]._serialized_end = 11067
- _globals["_STATDESCRIBE"]._serialized_start = 11069
- _globals["_STATDESCRIBE"]._serialized_end = 11150
- _globals["_STATCROSSTAB"]._serialized_start = 11152
- _globals["_STATCROSSTAB"]._serialized_end = 11253
- _globals["_STATCOV"]._serialized_start = 11255
- _globals["_STATCOV"]._serialized_end = 11351
- _globals["_STATCORR"]._serialized_start = 11354
- _globals["_STATCORR"]._serialized_end = 11491
- _globals["_STATAPPROXQUANTILE"]._serialized_start = 11494
- _globals["_STATAPPROXQUANTILE"]._serialized_end = 11658
- _globals["_STATFREQITEMS"]._serialized_start = 11660
- _globals["_STATFREQITEMS"]._serialized_end = 11785
- _globals["_STATSAMPLEBY"]._serialized_start = 11788
- _globals["_STATSAMPLEBY"]._serialized_end = 12097
- _globals["_STATSAMPLEBY_FRACTION"]._serialized_start = 11989
- _globals["_STATSAMPLEBY_FRACTION"]._serialized_end = 12088
- _globals["_NAFILL"]._serialized_start = 12100
- _globals["_NAFILL"]._serialized_end = 12234
- _globals["_NADROP"]._serialized_start = 12237
- _globals["_NADROP"]._serialized_end = 12371
- _globals["_NAREPLACE"]._serialized_start = 12374
- _globals["_NAREPLACE"]._serialized_end = 12670
- _globals["_NAREPLACE_REPLACEMENT"]._serialized_start = 12529
- _globals["_NAREPLACE_REPLACEMENT"]._serialized_end = 12670
- _globals["_TODF"]._serialized_start = 12672
- _globals["_TODF"]._serialized_end = 12760
- _globals["_WITHCOLUMNSRENAMED"]._serialized_start = 12763
- _globals["_WITHCOLUMNSRENAMED"]._serialized_end = 13145
- _globals["_WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY"]._serialized_start =
13007
- _globals["_WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY"]._serialized_end =
13074
- _globals["_WITHCOLUMNSRENAMED_RENAME"]._serialized_start = 13076
- _globals["_WITHCOLUMNSRENAMED_RENAME"]._serialized_end = 13145
- _globals["_WITHCOLUMNS"]._serialized_start = 13147
- _globals["_WITHCOLUMNS"]._serialized_end = 13266
- _globals["_WITHWATERMARK"]._serialized_start = 13269
- _globals["_WITHWATERMARK"]._serialized_end = 13403
- _globals["_HINT"]._serialized_start = 13406
- _globals["_HINT"]._serialized_end = 13538
- _globals["_UNPIVOT"]._serialized_start = 13541
- _globals["_UNPIVOT"]._serialized_end = 13868
- _globals["_UNPIVOT_VALUES"]._serialized_start = 13798
- _globals["_UNPIVOT_VALUES"]._serialized_end = 13857
- _globals["_TRANSPOSE"]._serialized_start = 13870
- _globals["_TRANSPOSE"]._serialized_end = 13992
- _globals["_UNRESOLVEDTABLEVALUEDFUNCTION"]._serialized_start = 13994
- _globals["_UNRESOLVEDTABLEVALUEDFUNCTION"]._serialized_end = 14119
- _globals["_TOSCHEMA"]._serialized_start = 14121
- _globals["_TOSCHEMA"]._serialized_end = 14227
- _globals["_REPARTITIONBYEXPRESSION"]._serialized_start = 14230
- _globals["_REPARTITIONBYEXPRESSION"]._serialized_end = 14433
- _globals["_MAPPARTITIONS"]._serialized_start = 14436
- _globals["_MAPPARTITIONS"]._serialized_end = 14668
- _globals["_GROUPMAP"]._serialized_start = 14671
- _globals["_GROUPMAP"]._serialized_end = 15521
- _globals["_TRANSFORMWITHSTATEINFO"]._serialized_start = 15524
- _globals["_TRANSFORMWITHSTATEINFO"]._serialized_end = 15747
- _globals["_COGROUPMAP"]._serialized_start = 15750
- _globals["_COGROUPMAP"]._serialized_end = 16276
- _globals["_APPLYINPANDASWITHSTATE"]._serialized_start = 16279
- _globals["_APPLYINPANDASWITHSTATE"]._serialized_end = 16636
- _globals["_COMMONINLINEUSERDEFINEDTABLEFUNCTION"]._serialized_start = 16639
- _globals["_COMMONINLINEUSERDEFINEDTABLEFUNCTION"]._serialized_end = 16883
- _globals["_PYTHONUDTF"]._serialized_start = 16886
- _globals["_PYTHONUDTF"]._serialized_end = 17063
- _globals["_COMMONINLINEUSERDEFINEDDATASOURCE"]._serialized_start = 17066
- _globals["_COMMONINLINEUSERDEFINEDDATASOURCE"]._serialized_end = 17217
- _globals["_PYTHONDATASOURCE"]._serialized_start = 17219
- _globals["_PYTHONDATASOURCE"]._serialized_end = 17294
- _globals["_COLLECTMETRICS"]._serialized_start = 17297
- _globals["_COLLECTMETRICS"]._serialized_end = 17433
- _globals["_PARSE"]._serialized_start = 17436
- _globals["_PARSE"]._serialized_end = 17846
- _globals["_PARSE_OPTIONSENTRY"]._serialized_start = 6117
- _globals["_PARSE_OPTIONSENTRY"]._serialized_end = 6175
- _globals["_PARSE_PARSEFORMAT"]._serialized_start = 17725
- _globals["_PARSE_PARSEFORMAT"]._serialized_end = 17835
- _globals["_ASOFJOIN"]._serialized_start = 17849
- _globals["_ASOFJOIN"]._serialized_end = 18324
- _globals["_LATERALJOIN"]._serialized_start = 18327
- _globals["_LATERALJOIN"]._serialized_end = 18557
- _globals["_NEARESTBYJOIN"]._serialized_start = 18560
- _globals["_NEARESTBYJOIN"]._serialized_end = 18853
+ _globals["_RELATION"]._serialized_end = 4265
+ _globals["_MLRELATION"]._serialized_start = 4268
+ _globals["_MLRELATION"]._serialized_end = 4752
+ _globals["_MLRELATION_TRANSFORM"]._serialized_start = 4480
+ _globals["_MLRELATION_TRANSFORM"]._serialized_end = 4715
+ _globals["_FETCH"]._serialized_start = 4755
+ _globals["_FETCH"]._serialized_end = 5086
+ _globals["_FETCH_METHOD"]._serialized_start = 4871
+ _globals["_FETCH_METHOD"]._serialized_end = 5086
+ _globals["_FETCH_METHOD_ARGS"]._serialized_start = 4959
+ _globals["_FETCH_METHOD_ARGS"]._serialized_end = 5086
+ _globals["_UNKNOWN"]._serialized_start = 5088
+ _globals["_UNKNOWN"]._serialized_end = 5097
+ _globals["_RELATIONCOMMON"]._serialized_start = 5100
+ _globals["_RELATIONCOMMON"]._serialized_end = 5242
+ _globals["_SQL"]._serialized_start = 5245
+ _globals["_SQL"]._serialized_end = 5723
+ _globals["_SQL_ARGSENTRY"]._serialized_start = 5539
+ _globals["_SQL_ARGSENTRY"]._serialized_end = 5629
+ _globals["_SQL_NAMEDARGUMENTSENTRY"]._serialized_start = 5631
+ _globals["_SQL_NAMEDARGUMENTSENTRY"]._serialized_end = 5723
+ _globals["_WITHRELATIONS"]._serialized_start = 5725
+ _globals["_WITHRELATIONS"]._serialized_end = 5842
+ _globals["_READ"]._serialized_start = 5845
+ _globals["_READ"]._serialized_end = 6562
+ _globals["_READ_NAMEDTABLE"]._serialized_start = 6023
+ _globals["_READ_NAMEDTABLE"]._serialized_end = 6215
+ _globals["_READ_NAMEDTABLE_OPTIONSENTRY"]._serialized_start = 6157
+ _globals["_READ_NAMEDTABLE_OPTIONSENTRY"]._serialized_end = 6215
+ _globals["_READ_DATASOURCE"]._serialized_start = 6218
+ _globals["_READ_DATASOURCE"]._serialized_end = 6549
+ _globals["_READ_DATASOURCE_OPTIONSENTRY"]._serialized_start = 6157
+ _globals["_READ_DATASOURCE_OPTIONSENTRY"]._serialized_end = 6215
+ _globals["_RELATIONCHANGES"]._serialized_start = 6565
+ _globals["_RELATIONCHANGES"]._serialized_end = 6797
+ _globals["_RELATIONCHANGES_OPTIONSENTRY"]._serialized_start = 6157
+ _globals["_RELATIONCHANGES_OPTIONSENTRY"]._serialized_end = 6215
+ _globals["_PROJECT"]._serialized_start = 6799
+ _globals["_PROJECT"]._serialized_end = 6916
+ _globals["_FILTER"]._serialized_start = 6918
+ _globals["_FILTER"]._serialized_end = 7030
+ _globals["_JOIN"]._serialized_start = 7033
+ _globals["_JOIN"]._serialized_end = 7694
+ _globals["_JOIN_JOINDATATYPE"]._serialized_start = 7372
+ _globals["_JOIN_JOINDATATYPE"]._serialized_end = 7464
+ _globals["_JOIN_JOINTYPE"]._serialized_start = 7467
+ _globals["_JOIN_JOINTYPE"]._serialized_end = 7675
+ _globals["_SETOPERATION"]._serialized_start = 7697
+ _globals["_SETOPERATION"]._serialized_end = 8176
+ _globals["_SETOPERATION_SETOPTYPE"]._serialized_start = 8013
+ _globals["_SETOPERATION_SETOPTYPE"]._serialized_end = 8127
+ _globals["_LIMIT"]._serialized_start = 8178
+ _globals["_LIMIT"]._serialized_end = 8254
+ _globals["_OFFSET"]._serialized_start = 8256
+ _globals["_OFFSET"]._serialized_end = 8335
+ _globals["_TAIL"]._serialized_start = 8337
+ _globals["_TAIL"]._serialized_end = 8412
+ _globals["_AGGREGATE"]._serialized_start = 8415
+ _globals["_AGGREGATE"]._serialized_end = 9181
+ _globals["_AGGREGATE_PIVOT"]._serialized_start = 8830
+ _globals["_AGGREGATE_PIVOT"]._serialized_end = 8941
+ _globals["_AGGREGATE_GROUPINGSETS"]._serialized_start = 8943
+ _globals["_AGGREGATE_GROUPINGSETS"]._serialized_end = 9019
+ _globals["_AGGREGATE_GROUPTYPE"]._serialized_start = 9022
+ _globals["_AGGREGATE_GROUPTYPE"]._serialized_end = 9181
+ _globals["_SORT"]._serialized_start = 9184
+ _globals["_SORT"]._serialized_end = 9344
+ _globals["_DROP"]._serialized_start = 9347
+ _globals["_DROP"]._serialized_end = 9488
+ _globals["_DEDUPLICATE"]._serialized_start = 9491
+ _globals["_DEDUPLICATE"]._serialized_end = 9731
+ _globals["_LOCALRELATION"]._serialized_start = 9733
+ _globals["_LOCALRELATION"]._serialized_end = 9822
+ _globals["_CACHEDLOCALRELATION"]._serialized_start = 9824
+ _globals["_CACHEDLOCALRELATION"]._serialized_end = 9896
+ _globals["_CHUNKEDCACHEDLOCALRELATION"]._serialized_start = 9898
+ _globals["_CHUNKEDCACHEDLOCALRELATION"]._serialized_end = 10010
+ _globals["_CACHEDREMOTERELATION"]._serialized_start = 10012
+ _globals["_CACHEDREMOTERELATION"]._serialized_end = 10067
+ _globals["_SAMPLE"]._serialized_start = 10070
+ _globals["_SAMPLE"]._serialized_end = 10343
+ _globals["_RANGE"]._serialized_start = 10346
+ _globals["_RANGE"]._serialized_end = 10491
+ _globals["_SUBQUERYALIAS"]._serialized_start = 10493
+ _globals["_SUBQUERYALIAS"]._serialized_end = 10607
+ _globals["_REPARTITION"]._serialized_start = 10610
+ _globals["_REPARTITION"]._serialized_end = 10752
+ _globals["_SHOWSTRING"]._serialized_start = 10755
+ _globals["_SHOWSTRING"]._serialized_end = 10897
+ _globals["_HTMLSTRING"]._serialized_start = 10899
+ _globals["_HTMLSTRING"]._serialized_end = 11013
+ _globals["_STATSUMMARY"]._serialized_start = 11015
+ _globals["_STATSUMMARY"]._serialized_end = 11107
+ _globals["_STATDESCRIBE"]._serialized_start = 11109
+ _globals["_STATDESCRIBE"]._serialized_end = 11190
+ _globals["_STATCROSSTAB"]._serialized_start = 11192
+ _globals["_STATCROSSTAB"]._serialized_end = 11293
+ _globals["_STATCOV"]._serialized_start = 11295
+ _globals["_STATCOV"]._serialized_end = 11391
+ _globals["_STATCORR"]._serialized_start = 11394
+ _globals["_STATCORR"]._serialized_end = 11531
+ _globals["_STATAPPROXQUANTILE"]._serialized_start = 11534
+ _globals["_STATAPPROXQUANTILE"]._serialized_end = 11698
+ _globals["_STATFREQITEMS"]._serialized_start = 11700
+ _globals["_STATFREQITEMS"]._serialized_end = 11825
+ _globals["_STATSAMPLEBY"]._serialized_start = 11828
+ _globals["_STATSAMPLEBY"]._serialized_end = 12137
+ _globals["_STATSAMPLEBY_FRACTION"]._serialized_start = 12029
+ _globals["_STATSAMPLEBY_FRACTION"]._serialized_end = 12128
+ _globals["_NAFILL"]._serialized_start = 12140
+ _globals["_NAFILL"]._serialized_end = 12274
+ _globals["_NADROP"]._serialized_start = 12277
+ _globals["_NADROP"]._serialized_end = 12411
+ _globals["_NAREPLACE"]._serialized_start = 12414
+ _globals["_NAREPLACE"]._serialized_end = 12710
+ _globals["_NAREPLACE_REPLACEMENT"]._serialized_start = 12569
+ _globals["_NAREPLACE_REPLACEMENT"]._serialized_end = 12710
+ _globals["_TODF"]._serialized_start = 12712
+ _globals["_TODF"]._serialized_end = 12800
+ _globals["_WITHCOLUMNSRENAMED"]._serialized_start = 12803
+ _globals["_WITHCOLUMNSRENAMED"]._serialized_end = 13185
+ _globals["_WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY"]._serialized_start =
13047
+ _globals["_WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY"]._serialized_end =
13114
+ _globals["_WITHCOLUMNSRENAMED_RENAME"]._serialized_start = 13116
+ _globals["_WITHCOLUMNSRENAMED_RENAME"]._serialized_end = 13185
+ _globals["_WITHCOLUMNS"]._serialized_start = 13187
+ _globals["_WITHCOLUMNS"]._serialized_end = 13306
+ _globals["_WITHWATERMARK"]._serialized_start = 13309
+ _globals["_WITHWATERMARK"]._serialized_end = 13443
+ _globals["_HINT"]._serialized_start = 13446
+ _globals["_HINT"]._serialized_end = 13578
+ _globals["_UNPIVOT"]._serialized_start = 13581
+ _globals["_UNPIVOT"]._serialized_end = 13908
+ _globals["_UNPIVOT_VALUES"]._serialized_start = 13838
+ _globals["_UNPIVOT_VALUES"]._serialized_end = 13897
+ _globals["_TRANSPOSE"]._serialized_start = 13910
+ _globals["_TRANSPOSE"]._serialized_end = 14032
+ _globals["_UNRESOLVEDTABLEVALUEDFUNCTION"]._serialized_start = 14034
+ _globals["_UNRESOLVEDTABLEVALUEDFUNCTION"]._serialized_end = 14159
+ _globals["_TOSCHEMA"]._serialized_start = 14161
+ _globals["_TOSCHEMA"]._serialized_end = 14267
+ _globals["_REPARTITIONBYEXPRESSION"]._serialized_start = 14270
+ _globals["_REPARTITIONBYEXPRESSION"]._serialized_end = 14473
+ _globals["_MAPPARTITIONS"]._serialized_start = 14476
+ _globals["_MAPPARTITIONS"]._serialized_end = 14708
+ _globals["_GROUPMAP"]._serialized_start = 14711
+ _globals["_GROUPMAP"]._serialized_end = 15561
+ _globals["_TRANSFORMWITHSTATEINFO"]._serialized_start = 15564
+ _globals["_TRANSFORMWITHSTATEINFO"]._serialized_end = 15787
+ _globals["_COGROUPMAP"]._serialized_start = 15790
+ _globals["_COGROUPMAP"]._serialized_end = 16316
+ _globals["_APPLYINPANDASWITHSTATE"]._serialized_start = 16319
+ _globals["_APPLYINPANDASWITHSTATE"]._serialized_end = 16676
+ _globals["_COMMONINLINEUSERDEFINEDTABLEFUNCTION"]._serialized_start = 16679
+ _globals["_COMMONINLINEUSERDEFINEDTABLEFUNCTION"]._serialized_end = 16923
+ _globals["_PYTHONUDTF"]._serialized_start = 16926
+ _globals["_PYTHONUDTF"]._serialized_end = 17103
+ _globals["_COMMONINLINEUSERDEFINEDDATASOURCE"]._serialized_start = 17106
+ _globals["_COMMONINLINEUSERDEFINEDDATASOURCE"]._serialized_end = 17257
+ _globals["_PYTHONDATASOURCE"]._serialized_start = 17259
+ _globals["_PYTHONDATASOURCE"]._serialized_end = 17334
+ _globals["_COLLECTMETRICS"]._serialized_start = 17337
+ _globals["_COLLECTMETRICS"]._serialized_end = 17473
+ _globals["_PARSE"]._serialized_start = 17476
+ _globals["_PARSE"]._serialized_end = 17886
+ _globals["_PARSE_OPTIONSENTRY"]._serialized_start = 6157
+ _globals["_PARSE_OPTIONSENTRY"]._serialized_end = 6215
+ _globals["_PARSE_PARSEFORMAT"]._serialized_start = 17765
+ _globals["_PARSE_PARSEFORMAT"]._serialized_end = 17875
+ _globals["_ASOFJOIN"]._serialized_start = 17889
+ _globals["_ASOFJOIN"]._serialized_end = 18364
+ _globals["_LATERALJOIN"]._serialized_start = 18367
+ _globals["_LATERALJOIN"]._serialized_end = 18597
+ _globals["_NEARESTBYJOIN"]._serialized_start = 18600
+ _globals["_NEARESTBYJOIN"]._serialized_end = 18893
+ _globals["_ZIP"]._serialized_start = 18895
+ _globals["_ZIP"]._serialized_end = 18992
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi
b/python/pyspark/sql/connect/proto/relations_pb2.pyi
index c99de778db4c..2d17e88446d6 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi
@@ -112,6 +112,7 @@ class Relation(google.protobuf.message.Message):
CHUNKED_CACHED_LOCAL_RELATION_FIELD_NUMBER: builtins.int
RELATION_CHANGES_FIELD_NUMBER: builtins.int
NEAREST_BY_JOIN_FIELD_NUMBER: builtins.int
+ ZIP_FIELD_NUMBER: builtins.int
FILL_NA_FIELD_NUMBER: builtins.int
DROP_NA_FIELD_NUMBER: builtins.int
REPLACE_FIELD_NUMBER: builtins.int
@@ -226,6 +227,8 @@ class Relation(google.protobuf.message.Message):
@property
def nearest_by_join(self) -> global___NearestByJoin: ...
@property
+ def zip(self) -> global___Zip: ...
+ @property
def fill_na(self) -> global___NAFill:
"""NA functions"""
@property
@@ -314,6 +317,7 @@ class Relation(google.protobuf.message.Message):
chunked_cached_local_relation: global___ChunkedCachedLocalRelation |
None = ...,
relation_changes: global___RelationChanges | None = ...,
nearest_by_join: global___NearestByJoin | None = ...,
+ zip: global___Zip | None = ...,
fill_na: global___NAFill | None = ...,
drop_na: global___NADrop | None = ...,
replace: global___NAReplace | None = ...,
@@ -459,6 +463,8 @@ class Relation(google.protobuf.message.Message):
b"with_relations",
"with_watermark",
b"with_watermark",
+ "zip",
+ b"zip",
],
) -> builtins.bool: ...
def ClearField(
@@ -590,6 +596,8 @@ class Relation(google.protobuf.message.Message):
b"with_relations",
"with_watermark",
b"with_watermark",
+ "zip",
+ b"zip",
],
) -> None: ...
def WhichOneof(
@@ -642,6 +650,7 @@ class Relation(google.protobuf.message.Message):
"chunked_cached_local_relation",
"relation_changes",
"nearest_by_join",
+ "zip",
"fill_na",
"drop_na",
"replace",
@@ -4742,3 +4751,35 @@ class NearestByJoin(google.protobuf.message.Message):
) -> None: ...
global___NearestByJoin = NearestByJoin
+
+class Zip(google.protobuf.message.Message):
+ """Relation of type [[Zip]].
+
+ Combines the columns of two DataFrames side-by-side. Both DataFrames must
produce the same
+ canonicalized plan after stripping outer Project chains.
+ """
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ LEFT_FIELD_NUMBER: builtins.int
+ RIGHT_FIELD_NUMBER: builtins.int
+ @property
+ def left(self) -> global___Relation:
+ """(Required) Left input relation."""
+ @property
+ def right(self) -> global___Relation:
+ """(Required) Right input relation."""
+ def __init__(
+ self,
+ *,
+ left: global___Relation | None = ...,
+ right: global___Relation | None = ...,
+ ) -> None: ...
+ def HasField(
+ self, field_name: typing_extensions.Literal["left", b"left", "right",
b"right"]
+ ) -> builtins.bool: ...
+ def ClearField(
+ self, field_name: typing_extensions.Literal["left", b"left", "right",
b"right"]
+ ) -> None: ...
+
+global___Zip = Zip
diff --git a/python/pyspark/sql/tests/connect/test_connect_plan.py
b/python/pyspark/sql/tests/connect/test_connect_plan.py
index 097f2dc18231..d76411e07d29 100644
--- a/python/pyspark/sql/tests/connect/test_connect_plan.py
+++ b/python/pyspark/sql/tests/connect/test_connect_plan.py
@@ -112,6 +112,20 @@ class SparkConnectPlanTests(PlanOnlyTestFixture):
join_plan.root.join.join_type,
)
+ def test_zip(self):
+ left_input = self.connect.readTable(table_name=self.tbl_name)
+ right_input = self.connect.readTable(table_name=self.tbl_name)
+ plan = left_input.zip(right_input)._plan.to_proto(self.connect)
+ self.assertIsNotNone(plan.root.zip)
+ self.assertEqual(
+ plan.root.zip.left.read.named_table.unparsed_identifier,
+ self.tbl_name,
+ )
+ self.assertEqual(
+ plan.root.zip.right.read.named_table.unparsed_identifier,
+ self.tbl_name,
+ )
+
def test_filter(self):
df = self.connect.readTable(table_name=self.tbl_name)
plan = df.filter(df.col_name > 3)._plan.to_proto(self.connect)
diff --git a/python/pyspark/sql/tests/connect/test_parity_zip.py
b/python/pyspark/sql/tests/connect/test_parity_zip.py
index b0564e63a48b..2d80848463d5 100644
--- a/python/pyspark/sql/tests/connect/test_parity_zip.py
+++ b/python/pyspark/sql/tests/connect/test_parity_zip.py
@@ -15,20 +15,12 @@
# limitations under the License.
#
-from pyspark.errors import PySparkNotImplementedError
+from pyspark.sql.tests.test_zip import DataFrameZipTestsMixin
from pyspark.testing.connectutils import ReusedConnectTestCase
-class ZipParityTests(ReusedConnectTestCase):
- """`DataFrame.zip` is classic-only for now; assert the Connect stub raises
a clean
- NOT_IMPLEMENTED instead of falling through to a generic error or appearing
to work."""
-
- def test_zip_raises_not_implemented(self):
- df = self.spark.createDataFrame([(1, 2)], ["a", "b"])
- with self.assertRaises(PySparkNotImplementedError) as ctx:
- df.select("a").zip(df.select("b"))
- self.assertEqual(ctx.exception.getCondition(), "NOT_IMPLEMENTED")
- self.assertIn("zip", str(ctx.exception))
+class ZipParityTests(DataFrameZipTestsMixin, ReusedConnectTestCase):
+ pass
if __name__ == "__main__":
diff --git
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 199736da92ac..33aff976434c 100644
---
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -738,6 +738,10 @@ class PlanGenerationTestSuite extends ConnectFunSuite with
Logging {
simple.withMetadata("id", builder.build())
}
+ test("zip") {
+ left.select("id").zip(left.select("a"))
+ }
+
test("zipWithIndex") {
simple.zipWithIndex()
}
diff --git
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataFrameSuite.scala
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataFrameSuite.scala
index 57b8080c4b13..15ad4a43bbe7 100644
---
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataFrameSuite.scala
+++
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataFrameSuite.scala
@@ -88,4 +88,19 @@ class DataFrameSuite extends QueryTest with
RemoteSparkSession {
spark.conf.unset("spark.sql.analyzer.strictDataFrameColumnResolution")
}
}
+
+ test("zip") {
+ val sparkSession = spark
+ import sparkSession.implicits._
+
+ val df = Seq((1, 2, 3), (4, 5, 6)).toDF("a", "b", "c")
+ val left = df.select("a")
+ val right = df.select("b")
+
+ val zipped = left.zip(right)
+ assert(zipped.columns === Array("a", "b"))
+ val rows = zipped.collect().sortBy(_.getInt(0))
+ assert(rows(0).getInt(0) === 1 && rows(0).getInt(1) === 2)
+ assert(rows(1).getInt(0) === 4 && rows(1).getInt(1) === 5)
+ }
}
diff --git a/sql/connect/common/src/main/protobuf/spark/connect/relations.proto
b/sql/connect/common/src/main/protobuf/spark/connect/relations.proto
index 95cc9281d8ca..151719743703 100644
--- a/sql/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/sql/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -83,6 +83,7 @@ message Relation {
ChunkedCachedLocalRelation chunked_cached_local_relation = 45;
RelationChanges relation_changes = 46;
NearestByJoin nearest_by_join = 47;
+ Zip zip = 48;
// NA functions
NAFill fill_na = 90;
@@ -1307,3 +1308,15 @@ message NearestByJoin {
// (Required) Ranking direction. Must be one of: "distance", "similarity".
string direction = 7;
}
+
+// Relation of type [[Zip]].
+//
+// Combines the columns of two DataFrames side-by-side. Both DataFrames must
produce the same
+// canonicalized plan after stripping outer Project chains.
+message Zip {
+ // (Required) Left input relation.
+ Relation left = 1;
+
+ // (Required) Right input relation.
+ Relation right = 2;
+}
diff --git
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Dataset.scala
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Dataset.scala
index 3f7ed1a7c287..c27a83b79b89 100644
---
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Dataset.scala
+++
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Dataset.scala
@@ -348,7 +348,12 @@ class Dataset[T] private[sql] (
/** @inheritdoc */
def zip(other: sql.Dataset[_]): DataFrame = {
- throw new UnsupportedOperationException("zip is not supported in Spark
Connect")
+ checkSameSparkSession(other)
+ sparkSession.newDataFrame { builder =>
+ builder.getZipBuilder
+ .setLeft(plan.getRoot)
+ .setRight(other.asInstanceOf[Dataset[_]].plan.getRoot)
+ }
}
/** @inheritdoc */
diff --git
a/sql/connect/common/src/test/resources/query-tests/explain-results/zip.explain
b/sql/connect/common/src/test/resources/query-tests/explain-results/zip.explain
new file mode 100644
index 000000000000..c0a9b3df30b2
--- /dev/null
+++
b/sql/connect/common/src/test/resources/query-tests/explain-results/zip.explain
@@ -0,0 +1,2 @@
+Project [id#0L, a#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/sql/connect/common/src/test/resources/query-tests/queries/zip.json
b/sql/connect/common/src/test/resources/query-tests/queries/zip.json
new file mode 100644
index 000000000000..e9e906fcd6f3
--- /dev/null
+++ b/sql/connect/common/src/test/resources/query-tests/queries/zip.json
@@ -0,0 +1,81 @@
+{
+ "common": {
+ "planId": "4"
+ },
+ "zip": {
+ "left": {
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ },
+ "common": {
+ "origin": {
+ "jvmOrigin": {
+ "stackTrace": [{
+ "classLoaderName": "app",
+ "declaringClass": "org.apache.spark.sql.connect.Dataset",
+ "methodName": "select",
+ "fileName": "Dataset.scala"
+ }, {
+ "classLoaderName": "app",
+ "declaringClass":
"org.apache.spark.sql.PlanGenerationTestSuite",
+ "methodName": "~~trimmed~anonfun~~",
+ "fileName": "PlanGenerationTestSuite.scala"
+ }]
+ }
+ }
+ }
+ }]
+ }
+ },
+ "right": {
+ "common": {
+ "planId": "3"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "2"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ },
+ "common": {
+ "origin": {
+ "jvmOrigin": {
+ "stackTrace": [{
+ "classLoaderName": "app",
+ "declaringClass": "org.apache.spark.sql.connect.Dataset",
+ "methodName": "select",
+ "fileName": "Dataset.scala"
+ }, {
+ "classLoaderName": "app",
+ "declaringClass":
"org.apache.spark.sql.PlanGenerationTestSuite",
+ "methodName": "~~trimmed~anonfun~~",
+ "fileName": "PlanGenerationTestSuite.scala"
+ }]
+ }
+ }
+ }
+ }]
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/zip.proto.bin
b/sql/connect/common/src/test/resources/query-tests/queries/zip.proto.bin
new file mode 100644
index 000000000000..478f6814f1e9
Binary files /dev/null and
b/sql/connect/common/src/test/resources/query-tests/queries/zip.proto.bin differ
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index c84eaadaa453..b067efa0579a 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -161,6 +161,7 @@ class SparkConnectPlanner(
case proto.Relation.RelTypeCase.LATERAL_JOIN =>
transformLateralJoin(rel.getLateralJoin)
case proto.Relation.RelTypeCase.NEAREST_BY_JOIN =>
transformNearestByJoin(rel.getNearestByJoin)
+ case proto.Relation.RelTypeCase.ZIP => transformZip(rel.getZip)
case proto.Relation.RelTypeCase.DEDUPLICATE =>
transformDeduplicate(rel.getDeduplicate)
case proto.Relation.RelTypeCase.SET_OP =>
transformSetOperation(rel.getSetOp)
case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort)
@@ -2591,6 +2592,11 @@ class SparkConnectPlanner(
.logicalPlan
}
+ private def transformZip(rel: proto.Zip): LogicalPlan = {
+ assertPlan(rel.hasLeft && rel.hasRight, "Both zip sides must be present")
+ logical.Zip(transformRelation(rel.getLeft),
transformRelation(rel.getRight))
+ }
+
private def transformSort(sort: proto.Sort): LogicalPlan = {
assertPlan(sort.getOrderCount > 0, "'order' must be present and contain
elements.")
logical.Sort(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]