This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push: new cab692c [SPARK-51917] Add `DataFrameWriterV2` actor cab692c is described below commit cab692c1eb0a1c0ef661f2dfeab8de2b56e4cfbe Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Sat Apr 26 22:27:16 2025 -0700 [SPARK-51917] Add `DataFrameWriterV2` actor ### What changes were proposed in this pull request? This PR aims to add `DataFrameWriterV2` actor. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No behavior change because this is an additional API. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #91 from dongjoon-hyun/SPARK-51917. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- Sources/SparkConnect/DataFrame.swift | 7 + Sources/SparkConnect/DataFrameWriterV2.swift | 152 +++++++++++++++++++++ Sources/SparkConnect/TypeAliases.swift | 1 + .../SparkConnectTests/DataFrameWriterV2Tests.swift | 89 ++++++++++++ 4 files changed, 249 insertions(+) diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift index 7c81105..1fbf2b4 100644 --- a/Sources/SparkConnect/DataFrame.swift +++ b/Sources/SparkConnect/DataFrame.swift @@ -852,4 +852,11 @@ public actor DataFrame: Sendable { return DataFrameWriter(df: self) } } + + /// Create a write configuration builder for v2 sources. + /// - Parameter table: A table name, e.g., `catalog.db.table`. + /// - Returns: A ``DataFrameWriterV2`` instance. + public func writeTo(_ table: String) -> DataFrameWriterV2 { + return DataFrameWriterV2(table, self) + } } diff --git a/Sources/SparkConnect/DataFrameWriterV2.swift b/Sources/SparkConnect/DataFrameWriterV2.swift new file mode 100644 index 0000000..3e95892 --- /dev/null +++ b/Sources/SparkConnect/DataFrameWriterV2.swift @@ -0,0 +1,152 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +import Foundation + +/// Interface used to write a ``DataFrame`` to external storage using the v2 API. +public actor DataFrameWriterV2: Sendable { + + let tableName: String + + let df: DataFrame + + var provider: String? = nil + + var extraOptions: CaseInsensitiveDictionary = CaseInsensitiveDictionary() + + var tableProperties: CaseInsensitiveDictionary = CaseInsensitiveDictionary() + + var partitioningColumns: [Spark_Connect_Expression] = [] + + var clusteringColumns: [String]? = nil + + init(_ table: String, _ df: DataFrame) { + self.tableName = table + self.df = df + } + + /// Specifies a provider for the underlying output data source. Spark's default catalog supports + /// "orc", "json", etc. + /// - Parameter provider: <#provider description#> + public func using(_ provider: String) -> DataFrameWriterV2 { + self.provider = provider + return self + } + + /// Adds an output option for the underlying data source. + /// - Parameters: + /// - key: A key string. + /// - value: A value string. + /// - Returns: A `DataFrameWriter`. + public func option(_ key: String, _ value: String) -> DataFrameWriterV2 { + self.extraOptions[key] = value + return self + } + + /// Add a table property. + /// - Parameters: + /// - property: A property name. + /// - value: A property value. + public func tableProperty(property: String, value: String) -> DataFrameWriterV2 { + self.tableProperties[property] = value + return self + } + + /// Partition the output table created by `create`, `createOrReplace`, or `replace` using the + /// given columns or transforms. + /// - Parameter columns: Columns to partition + /// - Returns: A ``DataFrameWriterV2``. + public func partitionBy(_ columns: String...) -> DataFrameWriterV2 { + self.partitioningColumns = columns.map { + var expr = Spark_Connect_Expression() + expr.expressionString = $0.toExpressionString + return expr + } + return self + } + + /// Clusters the output by the given columns on the storage. The rows with matching values in the + /// specified clustering columns will be consolidated within the same group. + /// - Parameter columns: Columns to cluster + /// - Returns: A ``DataFrameWriterV2``. + public func clusterBy(_ columns: String...) -> DataFrameWriterV2 { + self.clusteringColumns = columns + return self + } + + /// Create a new table from the contents of the data frame. + public func create() async throws { + try await executeWriteOperation(.create) + } + + /// Replace an existing table with the contents of the data frame. + public func replace() async throws { + try await executeWriteOperation(.replace) + } + + /// Create a new table or replace an existing table with the contents of the data frame. + public func createOrReplace() async throws { + try await executeWriteOperation(.createOrReplace) + } + + /// Append the contents of the data frame to the output table. + public func append() async throws { + try await executeWriteOperation(.append) + } + + /// Overwrite rows matching the given filter condition with the contents of the ``DataFrame`` in the + /// output table. + /// - Parameter condition: A filter condition. + public func overwrite(condition: String) async throws { + try await executeWriteOperation(.overwrite) + } + + /// Overwrite all partition for which the ``DataFrame`` contains at least one row with the contents + /// of the data frame in the output table. + /// This operation is equivalent to Hive's `INSERT OVERWRITE ... PARTITION`, which replaces + /// partitions dynamically depending on the contents of the ``DataFrame``. + public func overwritePartitions() async throws { + try await executeWriteOperation(.overwritePartitions) + } + + private func executeWriteOperation(_ mode: WriteOperationV2.Mode) async throws { + var write = WriteOperationV2() + + let plan = await self.df.getPlan() as! Plan + write.input = plan.root + write.tableName = self.tableName + if let provider = self.provider { + write.provider = provider + } + write.partitioningColumns = self.partitioningColumns + if let clusteringColumns = self.clusteringColumns { + write.clusteringColumns = clusteringColumns + } + for option in self.extraOptions.toStringDictionary() { + write.options[option.key] = option.value + } + for property in self.tableProperties.toStringDictionary() { + write.tableProperties[property.key] = property.value + } + write.mode = mode + + var command = Spark_Connect_Command() + command.writeOperationV2 = write + _ = try await df.spark.client.execute(df.spark.sessionID, command) + } +} diff --git a/Sources/SparkConnect/TypeAliases.swift b/Sources/SparkConnect/TypeAliases.swift index ba35fb6..60f0fb8 100644 --- a/Sources/SparkConnect/TypeAliases.swift +++ b/Sources/SparkConnect/TypeAliases.swift @@ -60,4 +60,5 @@ typealias UserContext = Spark_Connect_UserContext typealias UnresolvedAttribute = Spark_Connect_Expression.UnresolvedAttribute typealias WithColumnsRenamed = Spark_Connect_WithColumnsRenamed typealias WriteOperation = Spark_Connect_WriteOperation +typealias WriteOperationV2 = Spark_Connect_WriteOperationV2 typealias YearMonthInterval = Spark_Connect_DataType.YearMonthInterval diff --git a/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift b/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift new file mode 100644 index 0000000..e6e7aa1 --- /dev/null +++ b/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift @@ -0,0 +1,89 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +import Foundation +import SparkConnect +import Testing + +/// A test suite for `DataFrameWriterV2` +struct DataFrameWriterV2Tests { + + @Test + func create() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") + try await SQLHelper.withTable(spark, tableName)({ + let write = try await spark.range(2).writeTo(tableName).using("orc") + try await write.create() + #expect(try await spark.table(tableName).count() == 2) + try await #require(throws: Error.self) { + try await write.create() + } + }) + await spark.stop() + } + + @Test + func createOrReplace() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") + try await SQLHelper.withTable(spark, tableName)({ + let write = try await spark.range(2).writeTo(tableName).using("orc") + try await write.create() + #expect(try await spark.table(tableName).count() == 2) + // TODO: Use Iceberg to verify success case after Iceberg supports Apache Spark 4 + try await #require(throws: Error.self) { + try await write.createOrReplace() + } + }) + await spark.stop() + } + + @Test + func replace() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") + try await SQLHelper.withTable(spark, tableName)({ + let write = try await spark.range(2).writeTo(tableName).using("orc") + try await write.create() + #expect(try await spark.table(tableName).count() == 2) + // TODO: Use Iceberg to verify success case after Iceberg supports Apache Spark 4 + try await #require(throws: Error.self) { + try await write.replace() + } + }) + await spark.stop() + } + + @Test + func append() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") + try await SQLHelper.withTable(spark, tableName)({ + let write = try await spark.range(2).writeTo(tableName).using("orc") + try await write.create() + #expect(try await spark.table(tableName).count() == 2) + // TODO: Use Iceberg to verify success case after Iceberg supports Apache Spark 4 + try await #require(throws: Error.self) { + try await write.append() + } + }) + await spark.stop() + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org