This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new ccaa92b  [SPARK-51995] Support `toDF`, `distinct` and 
`dropDuplicates(WithinWatermark)?` in `DataFrame`
ccaa92b is described below

commit ccaa92b5cdb3870f41f9ff1815932548cc0864d6
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Sat May 3 19:35:03 2025 -0700

    [SPARK-51995] Support `toDF`, `distinct` and 
`dropDuplicates(WithinWatermark)?` in `DataFrame`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support the following APIs in `DataFrame`.
    
    - `toDF`
    - `distinct`
    - `dropDuplicates`
    - `dropDuplicatesWithinWatermark`
    
    ### Why are the changes needed?
    
    For feature parity.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #111 from dongjoon-hyun/SPARK-51995.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 Sources/SparkConnect/DataFrame.swift          | 43 ++++++++++++++++++++++++++-
 Sources/SparkConnect/SparkConnectClient.swift | 19 ++++++++++++
 Tests/SparkConnectTests/DataFrameTests.swift  | 35 ++++++++++++++++++++++
 3 files changed, 96 insertions(+), 1 deletion(-)

diff --git a/Sources/SparkConnect/DataFrame.swift 
b/Sources/SparkConnect/DataFrame.swift
index ad80b07..481b215 100644
--- a/Sources/SparkConnect/DataFrame.swift
+++ b/Sources/SparkConnect/DataFrame.swift
@@ -91,6 +91,7 @@ import Synchronization
 /// - ``show(_:_:_:)``
 ///
 /// ### Transformation Operations
+/// - ``toDF(_:)``
 /// - ``select(_:)``
 /// - ``selectExpr(_:)``
 /// - ``filter(_:)``
@@ -100,6 +101,9 @@ import Synchronization
 /// - ``limit(_:)``
 /// - ``offset(_:)``
 /// - ``drop(_:)``
+/// - ``dropDuplicates(_:)``
+/// - ``dropDuplicatesWithinWatermark(_:)``
+/// - ``distinct()``
 /// - ``withColumnRenamed(_:_:)``
 ///
 /// ### Join Operations
@@ -440,13 +444,25 @@ public actor DataFrame: Sendable {
     return DataFrame(spark: self.spark, plan: plan)
   }
 
-  /// Projects a set of expressions and returns a new ``DataFrame``.
+  /// Selects a subset of existing columns using column names.
   /// - Parameter cols: Column names
   /// - Returns: A ``DataFrame`` with subset of columns.
   public func select(_ cols: String...) -> DataFrame {
     return DataFrame(spark: self.spark, plan: 
SparkConnectClient.getProject(self.plan.root, cols))
   }
 
+  /// Selects a subset of existing columns using column names.
+  /// - Parameter cols: Column names
+  /// - Returns: A ``DataFrame`` with subset of columns.
+  public func toDF(_ cols: String...) -> DataFrame {
+    let df = if cols.isEmpty {
+      DataFrame(spark: self.spark, plan: self.plan)
+    } else {
+      DataFrame(spark: self.spark, plan: 
SparkConnectClient.getProject(self.plan.root, cols))
+    }
+    return df
+  }
+
   /// Projects a set of expressions and returns a new ``DataFrame``.
   /// - Parameter exprs: Expression strings
   /// - Returns: A ``DataFrame`` with subset of columns.
@@ -461,6 +477,24 @@ public actor DataFrame: Sendable {
     return DataFrame(spark: self.spark, plan: 
SparkConnectClient.getDrop(self.plan.root, cols))
   }
 
+  /// Returns a new ``DataFrame`` that contains only the unique rows from this 
``DataFrame``.
+  /// This is an alias for `distinct`. If column names are given, Spark 
considers only those columns.
+  /// - Parameter cols: Column names
+  /// - Returns: A ``DataFrame``.
+  public func dropDuplicates(_ cols: String...) -> DataFrame {
+    let plan = SparkConnectClient.getDropDuplicates(self.plan.root, cols, 
withinWatermark: false)
+    return DataFrame(spark: self.spark, plan: plan)
+  }
+
+  /// Returns a new Dataset with duplicates rows removed, within watermark.
+  /// If column names are given, Spark considers only those columns.
+  /// - Parameter cols: Column names
+  /// - Returns: A ``DataFrame``.
+  public func dropDuplicatesWithinWatermark(_ cols: String...) -> DataFrame {
+    let plan = SparkConnectClient.getDropDuplicates(self.plan.root, cols, 
withinWatermark: true)
+    return DataFrame(spark: self.spark, plan: plan)
+  }
+
   /// Returns a new Dataset with a column renamed. This is a no-op if schema 
doesn't contain existingName.
   /// - Parameters:
   ///   - existingName: A existing column name to be renamed.
@@ -1108,6 +1142,13 @@ public actor DataFrame: Sendable {
     return buildRepartition(numPartitions: numPartitions, shuffle: false)
   }
 
+  /// Returns a new ``Dataset`` that contains only the unique rows from this 
``Dataset``.
+  /// This is an alias for `dropDuplicates`.
+  /// - Returns: A `DataFrame`.
+  public func distinct() -> DataFrame {
+    return dropDuplicates()
+  }
+
   /// Groups the DataFrame using the specified columns.
   ///
   /// This method is used to perform aggregations on groups of data.
diff --git a/Sources/SparkConnect/SparkConnectClient.swift 
b/Sources/SparkConnect/SparkConnectClient.swift
index f1e396f..4b55584 100644
--- a/Sources/SparkConnect/SparkConnectClient.swift
+++ b/Sources/SparkConnect/SparkConnectClient.swift
@@ -455,6 +455,25 @@ public actor SparkConnectClient {
     return plan
   }
 
+  static func getDropDuplicates(
+    _ child: Relation,
+    _ columnNames: [String],
+    withinWatermark: Bool = false
+  ) -> Plan {
+    var deduplicate = Spark_Connect_Deduplicate()
+    deduplicate.input = child
+    if columnNames.isEmpty {
+      deduplicate.allColumnsAsKeys = true
+    } else {
+      deduplicate.columnNames = columnNames
+    }
+    var relation = Relation()
+    relation.deduplicate = deduplicate
+    var plan = Plan()
+    plan.opType = .root(relation)
+    return plan
+  }
+
   static func getSort(_ child: Relation, _ cols: [String]) -> Plan {
     var sort = Sort()
     sort.input = child
diff --git a/Tests/SparkConnectTests/DataFrameTests.swift 
b/Tests/SparkConnectTests/DataFrameTests.swift
index 13bc7c4..da53fe4 100644
--- a/Tests/SparkConnectTests/DataFrameTests.swift
+++ b/Tests/SparkConnectTests/DataFrameTests.swift
@@ -183,6 +183,7 @@ struct DataFrameTests {
   @Test
   func select() async throws {
     let spark = try await SparkSession.builder.getOrCreate()
+    #expect(try await spark.range(1).select().columns.isEmpty)
     let schema = try await spark.range(1).select("id").schema
     #expect(
       schema
@@ -191,6 +192,14 @@ struct DataFrameTests {
     await spark.stop()
   }
 
+  @Test
+  func toDF() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    #expect(try await spark.range(1).toDF().columns == ["id"])
+    #expect(try await spark.range(1).toDF("id").columns == ["id"])
+    await spark.stop()
+  }
+
   @Test
   func selectMultipleColumns() async throws {
     let spark = try await SparkSession.builder.getOrCreate()
@@ -647,6 +656,32 @@ struct DataFrameTests {
     await spark.stop()
   }
 
+  @Test
+  func distinct() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let df = try await spark.sql("SELECT * FROM VALUES (1), (2), (3), (1), (3) 
T(a)")
+    #expect(try await df.distinct().count() == 3)
+    await spark.stop()
+  }
+
+  @Test
+  func dropDuplicates() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let df = try await spark.sql("SELECT * FROM VALUES (1), (2), (3), (1), (3) 
T(a)")
+    #expect(try await df.dropDuplicates().count() == 3)
+    #expect(try await df.dropDuplicates("a").count() == 3)
+    await spark.stop()
+  }
+
+  @Test
+  func dropDuplicatesWithinWatermark() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let df = try await spark.sql("SELECT * FROM VALUES (1), (2), (3), (1), (3) 
T(a)")
+    #expect(try await df.dropDuplicatesWithinWatermark().count() == 3)
+    #expect(try await df.dropDuplicatesWithinWatermark("a").count() == 3)
+    await spark.stop()
+  }
+
   @Test
   func groupBy() async throws {
     let spark = try await SparkSession.builder.getOrCreate()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to