This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push: new 6687d72 [SPARK-51792] Support `saveAsTable` and `insertInto` 6687d72 is described below commit 6687d72295e0f33a4220caa8cf20faa898d072d6 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Mon Apr 14 17:12:24 2025 +0900 [SPARK-51792] Support `saveAsTable` and `insertInto` ### What changes were proposed in this pull request? This PR aims to support `saveAsTable` and `insertInto` APIs in `DataFrameWriter`. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No, this is a new addition. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #56 from dongjoon-hyun/SPARK-51792. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- Sources/SparkConnect/DataFrameWriter.swift | 46 ++++++++++++++++++---- Tests/SparkConnectTests/DataFrameWriterTests.swift | 43 ++++++++++++++++++++ 2 files changed, 82 insertions(+), 7 deletions(-) diff --git a/Sources/SparkConnect/DataFrameWriter.swift b/Sources/SparkConnect/DataFrameWriter.swift index 6846df2..9a142a5 100644 --- a/Sources/SparkConnect/DataFrameWriter.swift +++ b/Sources/SparkConnect/DataFrameWriter.swift @@ -113,16 +113,48 @@ public actor DataFrameWriter: Sendable { } private func saveInternal(_ path: String?) async throws { - var write = WriteOperation() + try await executeWriteOperation({ + var write = WriteOperation() + if let path = path { + write.path = path + } + return write + }) + } + + /// Saves the content of the ``DataFrame`` as the specified table. + /// - Parameter tableName: A table name. + public func saveAsTable(_ tableName: String) async throws { + try await executeWriteOperation({ + var write = WriteOperation() + write.table.tableName = tableName + write.table.saveMethod = .saveAsTable + return write + }) + } + + /// Inserts the content of the ``DataFrame`` to the specified table. It requires that the schema of + /// the ``DataFrame`` is the same as the schema of the table. Unlike ``saveAsTable``, + /// ``insertInto`` ignores the column names and just uses position-based resolution. + /// - Parameter tableName: A table name. + public func insertInto(_ tableName: String) async throws { + try await executeWriteOperation({ + var write = WriteOperation() + write.table.tableName = tableName + write.table.saveMethod = .insertInto + return write + }) + } + + private func executeWriteOperation(_ f: () -> WriteOperation) async throws { + var write = f() + + // Cannot both be set + assert(!(!write.path.isEmpty && !write.table.tableName.isEmpty)) + let plan = await self.df.getPlan() as! Plan write.input = plan.root write.mode = self.saveMode.toSaveMode - if let path = path { - write.path = path - } - - // Cannot both be set - // require(!(builder.hasPath && builder.hasTable)) if let source = self.source { write.source = source diff --git a/Tests/SparkConnectTests/DataFrameWriterTests.swift b/Tests/SparkConnectTests/DataFrameWriterTests.swift index d7fde78..da6d190 100644 --- a/Tests/SparkConnectTests/DataFrameWriterTests.swift +++ b/Tests/SparkConnectTests/DataFrameWriterTests.swift @@ -101,6 +101,49 @@ struct DataFrameWriterTests { await spark.stop() } + @Test + func saveAsTable() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") + try await SQLHelper.withTable(spark, tableName)({ + try await spark.range(1).write.saveAsTable(tableName) + #expect(try await spark.read.table(tableName).count() == 1) + + try await #require(throws: Error.self) { + try await spark.range(1).write.saveAsTable(tableName) + } + + try await spark.range(1).write.mode("overwrite").saveAsTable(tableName) + #expect(try await spark.read.table(tableName).count() == 1) + + try await spark.range(1).write.mode("append").saveAsTable(tableName) + #expect(try await spark.read.table(tableName).count() == 2) + }) + await spark.stop() + } + + @Test + func insertInto() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") + try await SQLHelper.withTable(spark, tableName)({ + // Table doesn't exist. + try await #require(throws: Error.self) { + try await spark.range(1).write.insertInto(tableName) + } + + try await spark.range(1).write.saveAsTable(tableName) + #expect(try await spark.read.table(tableName).count() == 1) + + try await spark.range(1).write.insertInto(tableName) + #expect(try await spark.read.table(tableName).count() == 2) + + try await spark.range(1).write.insertInto(tableName) + #expect(try await spark.read.table(tableName).count() == 3) + }) + await spark.stop() + } + @Test func partitionBy() async throws { let tmpDir = "/tmp/" + UUID().uuidString --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org