This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push: new ccaa92b [SPARK-51995] Support `toDF`, `distinct` and `dropDuplicates(WithinWatermark)?` in `DataFrame` ccaa92b is described below commit ccaa92b5cdb3870f41f9ff1815932548cc0864d6 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Sat May 3 19:35:03 2025 -0700 [SPARK-51995] Support `toDF`, `distinct` and `dropDuplicates(WithinWatermark)?` in `DataFrame` ### What changes were proposed in this pull request? This PR aims to support the following APIs in `DataFrame`. - `toDF` - `distinct` - `dropDuplicates` - `dropDuplicatesWithinWatermark` ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #111 from dongjoon-hyun/SPARK-51995. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- Sources/SparkConnect/DataFrame.swift | 43 ++++++++++++++++++++++++++- Sources/SparkConnect/SparkConnectClient.swift | 19 ++++++++++++ Tests/SparkConnectTests/DataFrameTests.swift | 35 ++++++++++++++++++++++ 3 files changed, 96 insertions(+), 1 deletion(-) diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift index ad80b07..481b215 100644 --- a/Sources/SparkConnect/DataFrame.swift +++ b/Sources/SparkConnect/DataFrame.swift @@ -91,6 +91,7 @@ import Synchronization /// - ``show(_:_:_:)`` /// /// ### Transformation Operations +/// - ``toDF(_:)`` /// - ``select(_:)`` /// - ``selectExpr(_:)`` /// - ``filter(_:)`` @@ -100,6 +101,9 @@ import Synchronization /// - ``limit(_:)`` /// - ``offset(_:)`` /// - ``drop(_:)`` +/// - ``dropDuplicates(_:)`` +/// - ``dropDuplicatesWithinWatermark(_:)`` +/// - ``distinct()`` /// - ``withColumnRenamed(_:_:)`` /// /// ### Join Operations @@ -440,13 +444,25 @@ public actor DataFrame: Sendable { return DataFrame(spark: self.spark, plan: plan) } - /// Projects a set of expressions and returns a new ``DataFrame``. + /// Selects a subset of existing columns using column names. /// - Parameter cols: Column names /// - Returns: A ``DataFrame`` with subset of columns. public func select(_ cols: String...) -> DataFrame { return DataFrame(spark: self.spark, plan: SparkConnectClient.getProject(self.plan.root, cols)) } + /// Selects a subset of existing columns using column names. + /// - Parameter cols: Column names + /// - Returns: A ``DataFrame`` with subset of columns. + public func toDF(_ cols: String...) -> DataFrame { + let df = if cols.isEmpty { + DataFrame(spark: self.spark, plan: self.plan) + } else { + DataFrame(spark: self.spark, plan: SparkConnectClient.getProject(self.plan.root, cols)) + } + return df + } + /// Projects a set of expressions and returns a new ``DataFrame``. /// - Parameter exprs: Expression strings /// - Returns: A ``DataFrame`` with subset of columns. @@ -461,6 +477,24 @@ public actor DataFrame: Sendable { return DataFrame(spark: self.spark, plan: SparkConnectClient.getDrop(self.plan.root, cols)) } + /// Returns a new ``DataFrame`` that contains only the unique rows from this ``DataFrame``. + /// This is an alias for `distinct`. If column names are given, Spark considers only those columns. + /// - Parameter cols: Column names + /// - Returns: A ``DataFrame``. + public func dropDuplicates(_ cols: String...) -> DataFrame { + let plan = SparkConnectClient.getDropDuplicates(self.plan.root, cols, withinWatermark: false) + return DataFrame(spark: self.spark, plan: plan) + } + + /// Returns a new Dataset with duplicates rows removed, within watermark. + /// If column names are given, Spark considers only those columns. + /// - Parameter cols: Column names + /// - Returns: A ``DataFrame``. + public func dropDuplicatesWithinWatermark(_ cols: String...) -> DataFrame { + let plan = SparkConnectClient.getDropDuplicates(self.plan.root, cols, withinWatermark: true) + return DataFrame(spark: self.spark, plan: plan) + } + /// Returns a new Dataset with a column renamed. This is a no-op if schema doesn't contain existingName. /// - Parameters: /// - existingName: A existing column name to be renamed. @@ -1108,6 +1142,13 @@ public actor DataFrame: Sendable { return buildRepartition(numPartitions: numPartitions, shuffle: false) } + /// Returns a new ``Dataset`` that contains only the unique rows from this ``Dataset``. + /// This is an alias for `dropDuplicates`. + /// - Returns: A `DataFrame`. + public func distinct() -> DataFrame { + return dropDuplicates() + } + /// Groups the DataFrame using the specified columns. /// /// This method is used to perform aggregations on groups of data. diff --git a/Sources/SparkConnect/SparkConnectClient.swift b/Sources/SparkConnect/SparkConnectClient.swift index f1e396f..4b55584 100644 --- a/Sources/SparkConnect/SparkConnectClient.swift +++ b/Sources/SparkConnect/SparkConnectClient.swift @@ -455,6 +455,25 @@ public actor SparkConnectClient { return plan } + static func getDropDuplicates( + _ child: Relation, + _ columnNames: [String], + withinWatermark: Bool = false + ) -> Plan { + var deduplicate = Spark_Connect_Deduplicate() + deduplicate.input = child + if columnNames.isEmpty { + deduplicate.allColumnsAsKeys = true + } else { + deduplicate.columnNames = columnNames + } + var relation = Relation() + relation.deduplicate = deduplicate + var plan = Plan() + plan.opType = .root(relation) + return plan + } + static func getSort(_ child: Relation, _ cols: [String]) -> Plan { var sort = Sort() sort.input = child diff --git a/Tests/SparkConnectTests/DataFrameTests.swift b/Tests/SparkConnectTests/DataFrameTests.swift index 13bc7c4..da53fe4 100644 --- a/Tests/SparkConnectTests/DataFrameTests.swift +++ b/Tests/SparkConnectTests/DataFrameTests.swift @@ -183,6 +183,7 @@ struct DataFrameTests { @Test func select() async throws { let spark = try await SparkSession.builder.getOrCreate() + #expect(try await spark.range(1).select().columns.isEmpty) let schema = try await spark.range(1).select("id").schema #expect( schema @@ -191,6 +192,14 @@ struct DataFrameTests { await spark.stop() } + @Test + func toDF() async throws { + let spark = try await SparkSession.builder.getOrCreate() + #expect(try await spark.range(1).toDF().columns == ["id"]) + #expect(try await spark.range(1).toDF("id").columns == ["id"]) + await spark.stop() + } + @Test func selectMultipleColumns() async throws { let spark = try await SparkSession.builder.getOrCreate() @@ -647,6 +656,32 @@ struct DataFrameTests { await spark.stop() } + @Test + func distinct() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let df = try await spark.sql("SELECT * FROM VALUES (1), (2), (3), (1), (3) T(a)") + #expect(try await df.distinct().count() == 3) + await spark.stop() + } + + @Test + func dropDuplicates() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let df = try await spark.sql("SELECT * FROM VALUES (1), (2), (3), (1), (3) T(a)") + #expect(try await df.dropDuplicates().count() == 3) + #expect(try await df.dropDuplicates("a").count() == 3) + await spark.stop() + } + + @Test + func dropDuplicatesWithinWatermark() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let df = try await spark.sql("SELECT * FROM VALUES (1), (2), (3), (1), (3) T(a)") + #expect(try await df.dropDuplicatesWithinWatermark().count() == 3) + #expect(try await df.dropDuplicatesWithinWatermark("a").count() == 3) + await spark.stop() + } + @Test func groupBy() async throws { let spark = try await SparkSession.builder.getOrCreate() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org