This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push: new ea3ea2a [SPARK-51807] Support `drop` and `withColumnRenamed` in `DataFrame` ea3ea2a is described below commit ea3ea2ae87137d079f89429441c14512fe380777 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Tue Apr 15 22:36:36 2025 +0900 [SPARK-51807] Support `drop` and `withColumnRenamed` in `DataFrame` ### What changes were proposed in this pull request? This PR aims to support `drop` and three `withColumnRenamed` APIs. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No, this is a new addition. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #60 from dongjoon-hyun/SPARK-51807. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- Sources/SparkConnect/DataFrame.swift | 33 +++++++++++++++++++++++++++ Sources/SparkConnect/SparkConnectClient.swift | 22 ++++++++++++++++++ Sources/SparkConnect/TypeAliases.swift | 2 ++ Tests/SparkConnectTests/DataFrameTests.swift | 22 ++++++++++++++++++ 4 files changed, 79 insertions(+) diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift index 72263df..0c93234 100644 --- a/Sources/SparkConnect/DataFrame.swift +++ b/Sources/SparkConnect/DataFrame.swift @@ -262,6 +262,39 @@ public actor DataFrame: Sendable { return DataFrame(spark: self.spark, plan: SparkConnectClient.getProject(self.plan.root, cols)) } + /// Returns a new Dataset with a column dropped. This is a no-op if schema doesn't contain column name. + /// - Parameter cols: Column names + /// - Returns: A ``DataFrame`` with subset of columns. + public func drop(_ cols: String...) -> DataFrame { + return DataFrame(spark: self.spark, plan: SparkConnectClient.getDrop(self.plan.root, cols)) + } + + /// Returns a new Dataset with a column renamed. This is a no-op if schema doesn't contain existingName. + /// - Parameters: + /// - existingName: A existing column name to be renamed. + /// - newName: A new column name. + /// - Returns: A ``DataFrame`` with the renamed column. + public func withColumnRenamed(_ existingName: String, _ newName: String) -> DataFrame { + return withColumnRenamed([existingName: newName]) + } + + /// Returns a new Dataset with columns renamed. This is a no-op if schema doesn't contain existingName. + /// - Parameters: + /// - colNames: A list of existing colum names to be renamed. + /// - newColNames: A list of new column names. + /// - Returns: A ``DataFrame`` with the renamed columns. + public func withColumnRenamed(_ colNames: [String], _ newColNames: [String]) -> DataFrame { + let dic = Dictionary(uniqueKeysWithValues: zip(colNames, newColNames)) + return DataFrame(spark: self.spark, plan: SparkConnectClient.getWithColumnRenamed(self.plan.root, dic)) + } + + /// Returns a new Dataset with columns renamed. This is a no-op if schema doesn't contain existingName. + /// - Parameter colsMap: A dictionary of existing column name and new column name. + /// - Returns: A ``DataFrame`` with the renamed columns. + public func withColumnRenamed(_ colsMap: [String: String]) -> DataFrame { + return DataFrame(spark: self.spark, plan: SparkConnectClient.getWithColumnRenamed(self.plan.root, colsMap)) + } + /// Return a new ``DataFrame`` with filtered rows using the given expression. /// - Parameter conditionExpr: A string to filter. /// - Returns: A ``DataFrame`` with subset of rows. diff --git a/Sources/SparkConnect/SparkConnectClient.swift b/Sources/SparkConnect/SparkConnectClient.swift index 2acbd6e..d76f533 100644 --- a/Sources/SparkConnect/SparkConnectClient.swift +++ b/Sources/SparkConnect/SparkConnectClient.swift @@ -335,6 +335,17 @@ public actor SparkConnectClient { return plan } + static func getWithColumnRenamed(_ child: Relation, _ colsMap: [String: String]) -> Plan { + var withColumnsRenamed = WithColumnsRenamed() + withColumnsRenamed.input = child + withColumnsRenamed.renameColumnsMap = colsMap + var relation = Relation() + relation.withColumnsRenamed = withColumnsRenamed + var plan = Plan() + plan.opType = .root(relation) + return plan + } + static func getFilter(_ child: Relation, _ conditionExpr: String) -> Plan { var filter = Filter() filter.input = child @@ -346,6 +357,17 @@ public actor SparkConnectClient { return plan } + static func getDrop(_ child: Relation, _ columnNames: [String]) -> Plan { + var drop = Drop() + drop.input = child + drop.columnNames = columnNames + var relation = Relation() + relation.drop = drop + var plan = Plan() + plan.opType = .root(relation) + return plan + } + static func getSort(_ child: Relation, _ cols: [String]) -> Plan { var sort = Sort() sort.input = child diff --git a/Sources/SparkConnect/TypeAliases.swift b/Sources/SparkConnect/TypeAliases.swift index 766ad02..0934a05 100644 --- a/Sources/SparkConnect/TypeAliases.swift +++ b/Sources/SparkConnect/TypeAliases.swift @@ -23,6 +23,7 @@ typealias ConfigRequest = Spark_Connect_ConfigRequest typealias DataSource = Spark_Connect_Read.DataSource typealias DataType = Spark_Connect_DataType typealias DayTimeInterval = Spark_Connect_DataType.DayTimeInterval +typealias Drop = Spark_Connect_Drop typealias ExecutePlanRequest = Spark_Connect_ExecutePlanRequest typealias ExecutePlanResponse = Spark_Connect_ExecutePlanResponse typealias ExplainMode = AnalyzePlanRequest.Explain.ExplainMode @@ -47,5 +48,6 @@ typealias StructType = Spark_Connect_DataType.Struct typealias Tail = Spark_Connect_Tail typealias UserContext = Spark_Connect_UserContext typealias UnresolvedAttribute = Spark_Connect_Expression.UnresolvedAttribute +typealias WithColumnsRenamed = Spark_Connect_WithColumnsRenamed typealias WriteOperation = Spark_Connect_WriteOperation typealias YearMonthInterval = Spark_Connect_DataType.YearMonthInterval diff --git a/Tests/SparkConnectTests/DataFrameTests.swift b/Tests/SparkConnectTests/DataFrameTests.swift index 1e602c5..aee0c93 100644 --- a/Tests/SparkConnectTests/DataFrameTests.swift +++ b/Tests/SparkConnectTests/DataFrameTests.swift @@ -172,6 +172,28 @@ struct DataFrameTests { await spark.stop() } + @Test + func withColumnRenamed() async throws { + let spark = try await SparkSession.builder.getOrCreate() + #expect(try await spark.range(1).withColumnRenamed("id", "id2").columns == ["id2"]) + let df = try await spark.sql("SELECT 1 a, 2 b, 3 c, 4 d") + #expect(try await df.withColumnRenamed(["a": "x", "c": "z"]).columns == ["x", "b", "z", "d"]) + // Ignore unknown column names. + #expect(try await df.withColumnRenamed(["unknown": "x"]).columns == ["a", "b", "c", "d"]) + await spark.stop() + } + + @Test + func drop() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let df = try await spark.sql("SELECT 1 a, 2 b, 3 c, 4 d") + #expect(try await df.drop("a").columns == ["b", "c", "d"]) + #expect(try await df.drop("b", "c").columns == ["a", "d"]) + // Ignore unknown column names. + #expect(try await df.drop("x", "y").columns == ["a", "b", "c", "d"]) + await spark.stop() + } + @Test func filter() async throws { let spark = try await SparkSession.builder.getOrCreate() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org