This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d765114d6e [SPARK-43046][SS][CONNECT] Implemented Python API 
dropDuplicatesWithinWatermark for Spark Connect
4d765114d6e is described below

commit 4d765114d6e5dd1a78a7ad798750e7bc400a72a6
Author: bogao007 <[email protected]>
AuthorDate: Sat Apr 22 09:21:16 2023 +0900

    [SPARK-43046][SS][CONNECT] Implemented Python API 
dropDuplicatesWithinWatermark for Spark Connect
    
    ### What changes were proposed in this pull request?
    
    Implemented `dropDuplicatesWithinWatermark` Python API for Spark Connect. 
This change is based on a previous 
[commit](https://github.com/apache/spark/commit/0e9e34c1bd9bd16ad5efca77ce2763eb950f3103)
 that introduced `dropDuplicatesWithinWatermark` API in Spark.
    
    ### Why are the changes needed?
    
    We recently introduced dropDuplicatesWithinWatermark API in Spark ([commit 
link](https://github.com/apache/spark/commit/0e9e34c1bd9bd16ad5efca77ce2763eb950f3103)).
 We want to bring parity to the Spark Connect.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this introduces a new public API, dropDuplicatesWithinWatermark in 
Spark Connect.
    
    ### How was this patch tested?
    
    Added new test cases in test suites.
    
    Closes #40834 from bogao007/drop-dup-watermark.
    
    Authored-by: bogao007 <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../main/protobuf/spark/connect/relations.proto    |   3 +
 .../org/apache/spark/sql/connect/dsl/package.scala |  11 ++
 .../sql/connect/planner/SparkConnectPlanner.scala  |   8 +-
 .../connect/planner/SparkConnectPlannerSuite.scala |  30 ++++
 .../connect/planner/SparkConnectProtoSuite.scala   |  10 ++
 python/pyspark/sql/connect/dataframe.py            |  22 ++-
 python/pyspark/sql/connect/plan.py                 |   3 +
 python/pyspark/sql/connect/proto/relations_pb2.py  | 152 ++++++++++-----------
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  17 +++
 .../sql/tests/connect/test_connect_basic.py        |  10 +-
 10 files changed, 183 insertions(+), 83 deletions(-)

diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index 57bdf57c9cb..29e701a42cf 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -362,6 +362,9 @@ message Deduplicate {
   //
   // This field does not co-use with `column_names`.
   optional bool all_columns_as_keys = 3;
+
+  // (Optional) Deduplicate within the time range of watermark.
+  optional bool within_watermark = 4;
 }
 
 // A relation that does not need to be qualified by name.
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index 21b9180ccfb..25d722cf58d 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -596,6 +596,17 @@ package object dsl {
               .addAllColumnNames(colNames.asJava))
           .build()
 
+      def deduplicateWithinWatermark(colNames: Seq[String]): Relation =
+        Relation
+          .newBuilder()
+          .setDeduplicate(
+            Deduplicate
+              .newBuilder()
+              .setInput(logicalPlan)
+              .addAllColumnNames(colNames.asJava)
+              .setWithinWatermark(true))
+          .build()
+
       def distinct(): Relation =
         Relation
           .newBuilder()
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index ef502e282ee..ba394396077 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -46,7 +46,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, 
ParseException, ParserUtils}
 import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, JoinType, 
LeftAnti, LeftOuter, LeftSemi, RightOuter, UsingJoin}
 import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.{CollectMetrics, 
CommandResult, Deduplicate, DeserializeToObject, Except, Intersect, 
LocalRelation, LogicalPlan, MapPartitions, Project, Sample, 
SerializeFromObject, Sort, SubqueryAlias, TypedFilter, Union, Unpivot, 
UnresolvedHint}
+import org.apache.spark.sql.catalyst.plans.logical.{CollectMetrics, 
CommandResult, Deduplicate, DeduplicateWithinWatermark, DeserializeToObject, 
Except, Intersect, LocalRelation, LogicalPlan, MapPartitions, Project, Sample, 
SerializeFromObject, Sort, SubqueryAlias, TypedFilter, Union, Unpivot, 
UnresolvedHint}
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
CharVarcharUtils}
 import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
 import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, 
InvalidPlanInput, LiteralValueProtoConverter, StorageLevelProtoConverter, 
UdfPacket}
@@ -738,7 +738,8 @@ class SparkConnectPlanner(val session: SparkSession) {
     val resolver = session.sessionState.analyzer.resolver
     val allColumns = queryExecution.analyzed.output
     if (rel.getAllColumnsAsKeys) {
-      Deduplicate(allColumns, queryExecution.analyzed)
+      if (rel.getWithinWatermark) DeduplicateWithinWatermark(allColumns, 
queryExecution.analyzed)
+      else Deduplicate(allColumns, queryExecution.analyzed)
     } else {
       val toGroupColumnNames = rel.getColumnNamesList.asScala.toSeq
       val groupCols = toGroupColumnNames.flatMap { (colName: String) =>
@@ -750,7 +751,8 @@ class SparkConnectPlanner(val session: SparkSession) {
         }
         cols
       }
-      Deduplicate(groupCols, queryExecution.analyzed)
+      if (rel.getWithinWatermark) DeduplicateWithinWatermark(groupCols, 
queryExecution.analyzed)
+      else Deduplicate(groupCols, queryExecution.analyzed)
     }
   }
 
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
index ec2362d5a56..8dac0b166b6 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
@@ -381,6 +381,36 @@ class SparkConnectPlannerSuite extends SparkFunSuite with 
SparkConnectPlanTest {
     assert(e2.getMessage.contains("either deduplicate on all columns or a 
subset of columns"))
   }
 
+  test("Test invalid deduplicateWithinWatermark") {
+    val deduplicateWithinWatermark = proto.Deduplicate
+      .newBuilder()
+      .setInput(readRel)
+      .setAllColumnsAsKeys(true)
+      .addColumnNames("test")
+      .setWithinWatermark(true)
+
+    val e = intercept[InvalidPlanInput] {
+      transform(
+        proto.Relation.newBuilder
+          .setDeduplicate(deduplicateWithinWatermark)
+          .build())
+    }
+    assert(
+      e.getMessage.contains("Cannot deduplicate on both all columns and a 
subset of columns"))
+
+    val deduplicateWithinWatermark2 = proto.Deduplicate
+      .newBuilder()
+      .setInput(readRel)
+      .setWithinWatermark(true)
+    val e2 = intercept[InvalidPlanInput] {
+      transform(
+        proto.Relation.newBuilder
+          .setDeduplicate(deduplicateWithinWatermark2)
+          .build())
+    }
+    assert(e2.getMessage.contains("either deduplicate on all columns or a 
subset of columns"))
+  }
+
   test("Test invalid intersect, except") {
     // Except with union_by_name=true
     val except = proto.SetOperation
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index 824ee7aceb4..96dae647db6 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -348,6 +348,16 @@ class SparkConnectProtoSuite extends PlanTest with 
SparkConnectPlanTest {
     comparePlans(connectPlan2, sparkPlan2)
   }
 
+  test("Test basic deduplicateWithinWatermark") {
+    val connectPlan = connectTestRelation.distinct()
+    val sparkPlan = sparkTestRelation.distinct()
+    comparePlans(connectPlan, sparkPlan)
+
+    val connectPlan2 = 
connectTestRelation.deduplicateWithinWatermark(Seq("id", "name"))
+    val sparkPlan2 = sparkTestRelation.dropDuplicatesWithinWatermark(Seq("id", 
"name"))
+    comparePlans(connectPlan2, sparkPlan2)
+  }
+
   test("Test union, except, intersect") {
     val connectPlan1 = connectTestRelation.except(connectTestRelation, isAll = 
false)
     val sparkPlan1 = sparkTestRelation.except(sparkTestRelation)
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 87eb6df5cba..3b39b82196e 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -379,7 +379,26 @@ class DataFrame:
     drop_duplicates = dropDuplicates
 
     def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = 
None) -> "DataFrame":
-        raise NotImplementedError("dropDuplicatesWithinWatermark() is not 
implemented.")
+        if subset is not None and not isinstance(subset, (list, tuple)):
+            raise PySparkTypeError(
+                error_class="NOT_LIST_OR_TUPLE",
+                message_parameters={"arg_name": "subset", "arg_type": 
type(subset).__name__},
+            )
+
+        if subset is None:
+            return DataFrame.withPlan(
+                plan.Deduplicate(child=self._plan, all_columns_as_keys=True, 
within_watermark=True),
+                session=self._session,
+            )
+        else:
+            return DataFrame.withPlan(
+                plan.Deduplicate(child=self._plan, column_names=subset, 
within_watermark=True),
+                session=self._session,
+            )
+
+    dropDuplicatesWithinWatermark.__doc__ = 
PySparkDataFrame.dropDuplicatesWithinWatermark.__doc__
+
+    drop_duplicates_within_watermark = dropDuplicatesWithinWatermark
 
     def distinct(self) -> "DataFrame":
         return DataFrame.withPlan(
@@ -595,7 +614,6 @@ class DataFrame:
         fraction: Optional[Union[int, float]] = None,
         seed: Optional[int] = None,
     ) -> "DataFrame":
-
         # For the cases below:
         #   sample(True, 0.5 [, seed])
         #   sample(True, fraction=0.5 [, seed])
diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index 9e221814f12..8b6dda0b1ca 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -609,16 +609,19 @@ class Deduplicate(LogicalPlan):
         child: Optional["LogicalPlan"],
         all_columns_as_keys: bool = False,
         column_names: Optional[List[str]] = None,
+        within_watermark: bool = False,
     ) -> None:
         super().__init__(child)
         self.all_columns_as_keys = all_columns_as_keys
         self.column_names = column_names
+        self.within_watermark = within_watermark
 
     def plan(self, session: "SparkConnectClient") -> proto.Relation:
         assert self._child is not None
         plan = self._create_proto_relation()
         plan.deduplicate.input.CopyFrom(self._child.plan(session))
         plan.deduplicate.all_columns_as_keys = self.all_columns_as_keys
+        plan.deduplicate.within_watermark = self.within_watermark
         if self.column_names is not None:
             plan.deduplicate.column_names.extend(self.column_names)
         return plan
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py 
b/python/pyspark/sql/connect/proto/relations_pb2.py
index 6a4226185e7..b8b84f27743 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.py
+++ b/python/pyspark/sql/connect/proto/relations_pb2.py
@@ -36,7 +36,7 @@ from pyspark.sql.connect.proto import catalog_pb2 as 
spark_dot_connect_dot_catal
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\x99\x16\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
 
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
 
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
+    
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\x99\x16\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
 
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
 
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
 )
 
 
@@ -806,81 +806,81 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _DROP._serialized_start = 6124
     _DROP._serialized_end = 6265
     _DEDUPLICATE._serialized_start = 6268
-    _DEDUPLICATE._serialized_end = 6439
-    _LOCALRELATION._serialized_start = 6441
-    _LOCALRELATION._serialized_end = 6530
-    _SAMPLE._serialized_start = 6533
-    _SAMPLE._serialized_end = 6806
-    _RANGE._serialized_start = 6809
-    _RANGE._serialized_end = 6954
-    _SUBQUERYALIAS._serialized_start = 6956
-    _SUBQUERYALIAS._serialized_end = 7070
-    _REPARTITION._serialized_start = 7073
-    _REPARTITION._serialized_end = 7215
-    _SHOWSTRING._serialized_start = 7218
-    _SHOWSTRING._serialized_end = 7360
-    _HTMLSTRING._serialized_start = 7362
-    _HTMLSTRING._serialized_end = 7476
-    _STATSUMMARY._serialized_start = 7478
-    _STATSUMMARY._serialized_end = 7570
-    _STATDESCRIBE._serialized_start = 7572
-    _STATDESCRIBE._serialized_end = 7653
-    _STATCROSSTAB._serialized_start = 7655
-    _STATCROSSTAB._serialized_end = 7756
-    _STATCOV._serialized_start = 7758
-    _STATCOV._serialized_end = 7854
-    _STATCORR._serialized_start = 7857
-    _STATCORR._serialized_end = 7994
-    _STATAPPROXQUANTILE._serialized_start = 7997
-    _STATAPPROXQUANTILE._serialized_end = 8161
-    _STATFREQITEMS._serialized_start = 8163
-    _STATFREQITEMS._serialized_end = 8288
-    _STATSAMPLEBY._serialized_start = 8291
-    _STATSAMPLEBY._serialized_end = 8600
-    _STATSAMPLEBY_FRACTION._serialized_start = 8492
-    _STATSAMPLEBY_FRACTION._serialized_end = 8591
-    _NAFILL._serialized_start = 8603
-    _NAFILL._serialized_end = 8737
-    _NADROP._serialized_start = 8740
-    _NADROP._serialized_end = 8874
-    _NAREPLACE._serialized_start = 8877
-    _NAREPLACE._serialized_end = 9173
-    _NAREPLACE_REPLACEMENT._serialized_start = 9032
-    _NAREPLACE_REPLACEMENT._serialized_end = 9173
-    _TODF._serialized_start = 9175
-    _TODF._serialized_end = 9263
-    _WITHCOLUMNSRENAMED._serialized_start = 9266
-    _WITHCOLUMNSRENAMED._serialized_end = 9505
-    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 9438
-    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 9505
-    _WITHCOLUMNS._serialized_start = 9507
-    _WITHCOLUMNS._serialized_end = 9626
-    _WITHWATERMARK._serialized_start = 9629
-    _WITHWATERMARK._serialized_end = 9763
-    _HINT._serialized_start = 9766
-    _HINT._serialized_end = 9898
-    _UNPIVOT._serialized_start = 9901
-    _UNPIVOT._serialized_end = 10228
-    _UNPIVOT_VALUES._serialized_start = 10158
-    _UNPIVOT_VALUES._serialized_end = 10217
-    _TOSCHEMA._serialized_start = 10230
-    _TOSCHEMA._serialized_end = 10336
-    _REPARTITIONBYEXPRESSION._serialized_start = 10339
-    _REPARTITIONBYEXPRESSION._serialized_end = 10542
-    _MAPPARTITIONS._serialized_start = 10545
-    _MAPPARTITIONS._serialized_end = 10726
-    _GROUPMAP._serialized_start = 10729
-    _GROUPMAP._serialized_end = 10932
-    _COGROUPMAP._serialized_start = 10935
-    _COGROUPMAP._serialized_end = 11287
-    _APPLYINPANDASWITHSTATE._serialized_start = 11290
-    _APPLYINPANDASWITHSTATE._serialized_end = 11647
-    _COLLECTMETRICS._serialized_start = 11650
-    _COLLECTMETRICS._serialized_end = 11786
-    _PARSE._serialized_start = 11789
-    _PARSE._serialized_end = 12177
+    _DEDUPLICATE._serialized_end = 6508
+    _LOCALRELATION._serialized_start = 6510
+    _LOCALRELATION._serialized_end = 6599
+    _SAMPLE._serialized_start = 6602
+    _SAMPLE._serialized_end = 6875
+    _RANGE._serialized_start = 6878
+    _RANGE._serialized_end = 7023
+    _SUBQUERYALIAS._serialized_start = 7025
+    _SUBQUERYALIAS._serialized_end = 7139
+    _REPARTITION._serialized_start = 7142
+    _REPARTITION._serialized_end = 7284
+    _SHOWSTRING._serialized_start = 7287
+    _SHOWSTRING._serialized_end = 7429
+    _HTMLSTRING._serialized_start = 7431
+    _HTMLSTRING._serialized_end = 7545
+    _STATSUMMARY._serialized_start = 7547
+    _STATSUMMARY._serialized_end = 7639
+    _STATDESCRIBE._serialized_start = 7641
+    _STATDESCRIBE._serialized_end = 7722
+    _STATCROSSTAB._serialized_start = 7724
+    _STATCROSSTAB._serialized_end = 7825
+    _STATCOV._serialized_start = 7827
+    _STATCOV._serialized_end = 7923
+    _STATCORR._serialized_start = 7926
+    _STATCORR._serialized_end = 8063
+    _STATAPPROXQUANTILE._serialized_start = 8066
+    _STATAPPROXQUANTILE._serialized_end = 8230
+    _STATFREQITEMS._serialized_start = 8232
+    _STATFREQITEMS._serialized_end = 8357
+    _STATSAMPLEBY._serialized_start = 8360
+    _STATSAMPLEBY._serialized_end = 8669
+    _STATSAMPLEBY_FRACTION._serialized_start = 8561
+    _STATSAMPLEBY_FRACTION._serialized_end = 8660
+    _NAFILL._serialized_start = 8672
+    _NAFILL._serialized_end = 8806
+    _NADROP._serialized_start = 8809
+    _NADROP._serialized_end = 8943
+    _NAREPLACE._serialized_start = 8946
+    _NAREPLACE._serialized_end = 9242
+    _NAREPLACE_REPLACEMENT._serialized_start = 9101
+    _NAREPLACE_REPLACEMENT._serialized_end = 9242
+    _TODF._serialized_start = 9244
+    _TODF._serialized_end = 9332
+    _WITHCOLUMNSRENAMED._serialized_start = 9335
+    _WITHCOLUMNSRENAMED._serialized_end = 9574
+    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 9507
+    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 9574
+    _WITHCOLUMNS._serialized_start = 9576
+    _WITHCOLUMNS._serialized_end = 9695
+    _WITHWATERMARK._serialized_start = 9698
+    _WITHWATERMARK._serialized_end = 9832
+    _HINT._serialized_start = 9835
+    _HINT._serialized_end = 9967
+    _UNPIVOT._serialized_start = 9970
+    _UNPIVOT._serialized_end = 10297
+    _UNPIVOT_VALUES._serialized_start = 10227
+    _UNPIVOT_VALUES._serialized_end = 10286
+    _TOSCHEMA._serialized_start = 10299
+    _TOSCHEMA._serialized_end = 10405
+    _REPARTITIONBYEXPRESSION._serialized_start = 10408
+    _REPARTITIONBYEXPRESSION._serialized_end = 10611
+    _MAPPARTITIONS._serialized_start = 10614
+    _MAPPARTITIONS._serialized_end = 10795
+    _GROUPMAP._serialized_start = 10798
+    _GROUPMAP._serialized_end = 11001
+    _COGROUPMAP._serialized_start = 11004
+    _COGROUPMAP._serialized_end = 11356
+    _APPLYINPANDASWITHSTATE._serialized_start = 11359
+    _APPLYINPANDASWITHSTATE._serialized_end = 11716
+    _COLLECTMETRICS._serialized_start = 11719
+    _COLLECTMETRICS._serialized_end = 11855
+    _PARSE._serialized_start = 11858
+    _PARSE._serialized_end = 12246
     _PARSE_OPTIONSENTRY._serialized_start = 3597
     _PARSE_OPTIONSENTRY._serialized_end = 3655
-    _PARSE_PARSEFORMAT._serialized_start = 12078
-    _PARSE_PARSEFORMAT._serialized_end = 12166
+    _PARSE_PARSEFORMAT._serialized_start = 12147
+    _PARSE_PARSEFORMAT._serialized_end = 12235
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi 
b/python/pyspark/sql/connect/proto/relations_pb2.pyi
index b847378d78b..3dab4f8525e 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi
@@ -1436,6 +1436,7 @@ class Deduplicate(google.protobuf.message.Message):
     INPUT_FIELD_NUMBER: builtins.int
     COLUMN_NAMES_FIELD_NUMBER: builtins.int
     ALL_COLUMNS_AS_KEYS_FIELD_NUMBER: builtins.int
+    WITHIN_WATERMARK_FIELD_NUMBER: builtins.int
     @property
     def input(self) -> global___Relation:
         """(Required) Input relation for a Deduplicate."""
@@ -1452,22 +1453,29 @@ class Deduplicate(google.protobuf.message.Message):
 
     This field does not co-use with `column_names`.
     """
+    within_watermark: builtins.bool
+    """(Optional) Deduplicate within the time range of watermark."""
     def __init__(
         self,
         *,
         input: global___Relation | None = ...,
         column_names: collections.abc.Iterable[builtins.str] | None = ...,
         all_columns_as_keys: builtins.bool | None = ...,
+        within_watermark: builtins.bool | None = ...,
     ) -> None: ...
     def HasField(
         self,
         field_name: typing_extensions.Literal[
             "_all_columns_as_keys",
             b"_all_columns_as_keys",
+            "_within_watermark",
+            b"_within_watermark",
             "all_columns_as_keys",
             b"all_columns_as_keys",
             "input",
             b"input",
+            "within_watermark",
+            b"within_watermark",
         ],
     ) -> builtins.bool: ...
     def ClearField(
@@ -1475,18 +1483,27 @@ class Deduplicate(google.protobuf.message.Message):
         field_name: typing_extensions.Literal[
             "_all_columns_as_keys",
             b"_all_columns_as_keys",
+            "_within_watermark",
+            b"_within_watermark",
             "all_columns_as_keys",
             b"all_columns_as_keys",
             "column_names",
             b"column_names",
             "input",
             b"input",
+            "within_watermark",
+            b"within_watermark",
         ],
     ) -> None: ...
+    @typing.overload
     def WhichOneof(
         self,
         oneof_group: typing_extensions.Literal["_all_columns_as_keys", 
b"_all_columns_as_keys"],
     ) -> typing_extensions.Literal["all_columns_as_keys"] | None: ...
+    @typing.overload
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_within_watermark", 
b"_within_watermark"]
+    ) -> typing_extensions.Literal["within_watermark"] | None: ...
 
 global___Deduplicate = Deduplicate
 
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index b316f0f3b4c..d414316c2f9 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -1213,6 +1213,14 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
             df.dropDuplicates(["name"]).toPandas(), 
df2.dropDuplicates(["name"]).toPandas()
         )
 
+    def test_deduplicate_within_watermark_in_batch(self):
+        df = self.connect.read.table(self.tbl_name)
+        with self.assertRaisesRegex(
+            AnalysisException,
+            "dropDuplicatesWithinWatermark is not supported with batch 
DataFrames/DataSets",
+        ):
+            df.dropDuplicatesWithinWatermark().toPandas()
+
     def test_first(self):
         # SPARK-41002: test `first` API in Python Client
         df = self.connect.read.table(self.tbl_name)
@@ -1761,7 +1769,6 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
             self.connect.read.table(self.tbl_name).hint("REPARTITION", "id", 
3).toPandas()
 
     def test_join_hint(self):
-
         cdf1 = self.connect.createDataFrame([(2, "Alice"), (5, "Bob")], 
schema=["age", "name"])
         cdf2 = self.connect.createDataFrame(
             [Row(height=80, name="Tom"), Row(height=85, name="Bob")]
@@ -2284,7 +2291,6 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
         )
 
     def test_grouped_data(self):
-
         query = """
             SELECT * FROM VALUES
                 ('James', 'Sales', 3000, 2020),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to