This is an automated email from the ASF dual-hosted git repository.
dongjoon-hyun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push:
new db0806c [SPARK-57302] Support `withColumn/withColumns` for `DataFrame`
db0806c is described below
commit db0806c293a7ea279dc82b6a6843f348b276ad78
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun Jun 7 11:35:37 2026 -0700
[SPARK-57302] Support `withColumn/withColumns` for `DataFrame`
### What changes were proposed in this pull request?
This PR adds `withColumn` and `withColumns` transformations to `DataFrame`,
exposing the Spark Connect `WithColumns` relation that was already defined in
the generated protobuf but not surfaced in the Swift API.
New public APIs in `DataFrame+Transformations.swift`:
```swift
public func withColumn(_ colName: String, _ expr: String) -> DataFrame
public func withColumns(_ colsMap: [String: String]) -> DataFrame
```
- `withColumn` adds a new column or replaces an existing column with the
same name; it delegates to `withColumns` with a single-entry map.
- The column expression is provided as a SQL expression string and parsed
on the server, consistent with the existing `filter(_:)` / `selectExpr(_:)`
APIs (this client has no `Column` type).
- A static plan builder `SparkConnectClient.getWithColumns(_:_:)`
constructs the `WithColumns` relation by mapping each `(name, expr)` pair to an
`Expression.Alias`, mirroring the sibling `getWithColumnRenamed(_:_:)`.
- Adds the `WithColumns` type alias in `TypeAliases.swift`, matching the
existing `WithColumnsRenamed` alias.
### Why are the changes needed?
`DataFrame.withColumn` / `withColumns` are among the most commonly used
PySpark and Spark SQL APIs for deriving or replacing columns from expressions.
The Swift client previously supported only column *renaming*
(`withColumnRenamed`) and had no way to add or replace a column from a computed
expression. This change improves API parity with PySpark/Spark SQL.
### Does this PR introduce _any_ user-facing change?
Yes. This adds two new public `DataFrame` APIs.
```swift
let df = try await spark.range(1) // columns: [id]
try await df.withColumn("b", "id + 1").show() // columns: [id, b]
try await df.withColumns(["b": "id + 1",
"c": "id * 2"]).show() // columns: [id, b, c]
```
### How was this patch tested?
Pass the CIs.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.8)
Closes #405 from dongjoon-hyun/SPARK-57302.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
Sources/SparkConnect/DataFrame+Transformations.swift | 19 +++++++++++++++++++
Sources/SparkConnect/SparkConnectClient.swift | 12 ++++++++++++
Sources/SparkConnect/TypeAliases.swift | 1 +
Tests/SparkConnectTests/DataFrameTests.swift | 15 +++++++++++++++
4 files changed, 47 insertions(+)
diff --git a/Sources/SparkConnect/DataFrame+Transformations.swift
b/Sources/SparkConnect/DataFrame+Transformations.swift
index 0ba0a2a..2f21d29 100644
--- a/Sources/SparkConnect/DataFrame+Transformations.swift
+++ b/Sources/SparkConnect/DataFrame+Transformations.swift
@@ -142,6 +142,25 @@ extension DataFrame {
spark: self.spark, plan:
SparkConnectClient.getWithColumnRenamed(self.plan.root, colsMap))
}
+ /// Returns a new ``DataFrame`` by adding a column or replacing the existing
column that has the
+ /// same name.
+ /// - Parameters:
+ /// - colName: A new column name.
+ /// - expr: A SQL expression string for the new column.
+ /// - Returns: A ``DataFrame`` with the new or replaced column.
+ public func withColumn(_ colName: String, _ expr: String) -> DataFrame {
+ return withColumns([colName: expr])
+ }
+
+ /// Returns a new ``DataFrame`` by adding columns or replacing the existing
columns that have the
+ /// same names.
+ /// - Parameter colsMap: A dictionary of column name and SQL expression
string.
+ /// - Returns: A ``DataFrame`` with the new or replaced columns.
+ public func withColumns(_ colsMap: [String: String]) -> DataFrame {
+ return DataFrame(
+ spark: self.spark, plan:
SparkConnectClient.getWithColumns(self.plan.root, colsMap))
+ }
+
// MARK: - Filtering and Sorting
/// Filters rows using the given condition.
diff --git a/Sources/SparkConnect/SparkConnectClient.swift
b/Sources/SparkConnect/SparkConnectClient.swift
index ae2daa1..d7c1c05 100644
--- a/Sources/SparkConnect/SparkConnectClient.swift
+++ b/Sources/SparkConnect/SparkConnectClient.swift
@@ -557,6 +557,18 @@ public actor SparkConnectClient {
return createPlan { $0.withColumnsRenamed = withColumnsRenamed }
}
+ static func getWithColumns(_ child: Relation, _ colsMap: [String: String])
-> Plan {
+ var withColumns = WithColumns()
+ withColumns.input = child
+ withColumns.aliases = colsMap.map { (name, expr) in
+ var alias = Spark_Connect_Expression.Alias()
+ alias.expr = expr.toExpression
+ alias.name = [name]
+ return alias
+ }
+ return createPlan { $0.withColumns = withColumns }
+ }
+
static func getFilter(_ child: Relation, _ conditionExpr: String) -> Plan {
var filter = Filter()
filter.input = child
diff --git a/Sources/SparkConnect/TypeAliases.swift
b/Sources/SparkConnect/TypeAliases.swift
index 3f15f56..d70d161 100644
--- a/Sources/SparkConnect/TypeAliases.swift
+++ b/Sources/SparkConnect/TypeAliases.swift
@@ -67,6 +67,7 @@ typealias StructType = Spark_Connect_DataType.Struct
typealias Tail = Spark_Connect_Tail
typealias UserContext = Spark_Connect_UserContext
typealias UnresolvedAttribute = Spark_Connect_Expression.UnresolvedAttribute
+typealias WithColumns = Spark_Connect_WithColumns
typealias WithColumnsRenamed = Spark_Connect_WithColumnsRenamed
typealias WriteOperation = Spark_Connect_WriteOperation
typealias WriteOperationV2 = Spark_Connect_WriteOperationV2
diff --git a/Tests/SparkConnectTests/DataFrameTests.swift
b/Tests/SparkConnectTests/DataFrameTests.swift
index c745bb9..139c0ed 100644
--- a/Tests/SparkConnectTests/DataFrameTests.swift
+++ b/Tests/SparkConnectTests/DataFrameTests.swift
@@ -283,6 +283,21 @@ struct DataFrameTests {
await spark.stop()
}
+ @Test
+ func withColumn() async throws {
+ let spark = try await SparkSession.builder.getOrCreate()
+ // Add a new column.
+ #expect(try await spark.range(1).withColumn("b", "id + 1").columns ==
["id", "b"])
+ #expect(try await spark.range(1).withColumn("b", "id + 1").collect() ==
[Row(0, 1)])
+ // Replace an existing column in place.
+ #expect(try await spark.range(1).withColumn("id", "id + 1").columns ==
["id"])
+ #expect(try await spark.range(1).withColumn("id", "id + 1").collect() ==
[Row(1)])
+ // Map overload: replace multiple existing columns (positions are
preserved).
+ let df = try await spark.sql("SELECT 1 a, 2 b")
+ #expect(try await df.withColumns(["a": "a + 10", "b": "b + 20"]).collect()
== [Row(11, 22)])
+ await spark.stop()
+ }
+
@Test
func drop() async throws {
let spark = try await SparkSession.builder.getOrCreate()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]