This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new cab692c  [SPARK-51917] Add `DataFrameWriterV2` actor
cab692c is described below

commit cab692c1eb0a1c0ef661f2dfeab8de2b56e4cfbe
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Sat Apr 26 22:27:16 2025 -0700

    [SPARK-51917] Add `DataFrameWriterV2` actor
    
    ### What changes were proposed in this pull request?
    
    This PR aims to add `DataFrameWriterV2` actor.
    
    ### Why are the changes needed?
    
    For feature parity.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No behavior change because this is an additional API.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #91 from dongjoon-hyun/SPARK-51917.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 Sources/SparkConnect/DataFrame.swift               |   7 +
 Sources/SparkConnect/DataFrameWriterV2.swift       | 152 +++++++++++++++++++++
 Sources/SparkConnect/TypeAliases.swift             |   1 +
 .../SparkConnectTests/DataFrameWriterV2Tests.swift |  89 ++++++++++++
 4 files changed, 249 insertions(+)

diff --git a/Sources/SparkConnect/DataFrame.swift 
b/Sources/SparkConnect/DataFrame.swift
index 7c81105..1fbf2b4 100644
--- a/Sources/SparkConnect/DataFrame.swift
+++ b/Sources/SparkConnect/DataFrame.swift
@@ -852,4 +852,11 @@ public actor DataFrame: Sendable {
       return DataFrameWriter(df: self)
     }
   }
+
+  /// Create a write configuration builder for v2 sources.
+  /// - Parameter table: A table name, e.g., `catalog.db.table`.
+  /// - Returns: A ``DataFrameWriterV2`` instance.
+  public func writeTo(_ table: String) -> DataFrameWriterV2 {
+    return DataFrameWriterV2(table, self)
+  }
 }
diff --git a/Sources/SparkConnect/DataFrameWriterV2.swift 
b/Sources/SparkConnect/DataFrameWriterV2.swift
new file mode 100644
index 0000000..3e95892
--- /dev/null
+++ b/Sources/SparkConnect/DataFrameWriterV2.swift
@@ -0,0 +1,152 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+import Foundation
+
+/// Interface used to write a ``DataFrame`` to external storage using the v2 
API.
+public actor DataFrameWriterV2: Sendable {
+
+  let tableName: String
+
+  let df: DataFrame
+
+  var provider: String? = nil
+
+  var extraOptions: CaseInsensitiveDictionary = CaseInsensitiveDictionary()
+
+  var tableProperties: CaseInsensitiveDictionary = CaseInsensitiveDictionary()
+
+  var partitioningColumns: [Spark_Connect_Expression] = []
+
+  var clusteringColumns: [String]? = nil
+
+  init(_ table: String, _ df: DataFrame) {
+    self.tableName = table
+    self.df = df
+  }
+
+  /// Specifies a provider for the underlying output data source. Spark's 
default catalog supports
+  /// "orc", "json", etc.
+  /// - Parameter provider: <#provider description#>
+  public func using(_ provider: String) -> DataFrameWriterV2 {
+    self.provider = provider
+    return self
+  }
+
+  /// Adds an output option for the underlying data source.
+  /// - Parameters:
+  ///   - key: A key string.
+  ///   - value: A value string.
+  /// - Returns: A `DataFrameWriter`.
+  public func option(_ key: String, _ value: String) -> DataFrameWriterV2 {
+    self.extraOptions[key] = value
+    return self
+  }
+
+  /// Add a table property.
+  /// - Parameters:
+  ///   - property: A property name.
+  ///   - value: A property value.
+  public func tableProperty(property: String, value: String) -> 
DataFrameWriterV2 {
+    self.tableProperties[property] = value
+    return self
+  }
+
+  /// Partition the output table created by `create`, `createOrReplace`, or 
`replace` using the
+  /// given columns or transforms.
+  /// - Parameter columns: Columns to partition
+  /// - Returns: A ``DataFrameWriterV2``.
+  public func partitionBy(_ columns: String...) -> DataFrameWriterV2 {
+    self.partitioningColumns = columns.map {
+      var expr = Spark_Connect_Expression()
+      expr.expressionString = $0.toExpressionString
+      return expr
+    }
+    return self
+  }
+
+  /// Clusters the output by the given columns on the storage. The rows with 
matching values in the
+  /// specified clustering columns will be consolidated within the same group.
+  /// - Parameter columns: Columns to cluster
+  /// - Returns: A ``DataFrameWriterV2``.
+  public func clusterBy(_ columns: String...) -> DataFrameWriterV2 {
+    self.clusteringColumns = columns
+    return self
+  }
+
+  /// Create a new table from the contents of the data frame.
+  public func create() async throws {
+    try await executeWriteOperation(.create)
+  }
+
+  /// Replace an existing table with the contents of the data frame.
+  public func replace() async throws {
+    try await executeWriteOperation(.replace)
+  }
+
+  /// Create a new table or replace an existing table with the contents of the 
data frame.
+  public func createOrReplace() async throws {
+    try await executeWriteOperation(.createOrReplace)
+  }
+
+  /// Append the contents of the data frame to the output table.
+  public func append() async throws {
+    try await executeWriteOperation(.append)
+  }
+
+  /// Overwrite rows matching the given filter condition with the contents of 
the ``DataFrame`` in the
+  /// output table.
+  /// - Parameter condition: A filter condition.
+  public func overwrite(condition: String) async throws {
+    try await executeWriteOperation(.overwrite)
+  }
+
+  /// Overwrite all partition for which the ``DataFrame`` contains at least 
one row with the contents
+  /// of the data frame in the output table.
+  /// This operation is equivalent to Hive's `INSERT OVERWRITE ... PARTITION`, 
which replaces
+  /// partitions dynamically depending on the contents of the ``DataFrame``.
+  public func overwritePartitions() async throws {
+    try await executeWriteOperation(.overwritePartitions)
+  }
+
+  private func executeWriteOperation(_ mode: WriteOperationV2.Mode) async 
throws {
+    var write = WriteOperationV2()
+
+    let plan = await self.df.getPlan() as! Plan
+    write.input = plan.root
+    write.tableName = self.tableName
+    if let provider = self.provider {
+      write.provider = provider
+    }
+    write.partitioningColumns = self.partitioningColumns
+    if let clusteringColumns = self.clusteringColumns {
+      write.clusteringColumns = clusteringColumns
+    }
+    for option in self.extraOptions.toStringDictionary() {
+      write.options[option.key] = option.value
+    }
+    for property in self.tableProperties.toStringDictionary() {
+      write.tableProperties[property.key] = property.value
+    }
+    write.mode = mode
+
+    var command = Spark_Connect_Command()
+    command.writeOperationV2 = write
+    _ = try await df.spark.client.execute(df.spark.sessionID, command)
+  }
+}
diff --git a/Sources/SparkConnect/TypeAliases.swift 
b/Sources/SparkConnect/TypeAliases.swift
index ba35fb6..60f0fb8 100644
--- a/Sources/SparkConnect/TypeAliases.swift
+++ b/Sources/SparkConnect/TypeAliases.swift
@@ -60,4 +60,5 @@ typealias UserContext = Spark_Connect_UserContext
 typealias UnresolvedAttribute = Spark_Connect_Expression.UnresolvedAttribute
 typealias WithColumnsRenamed = Spark_Connect_WithColumnsRenamed
 typealias WriteOperation = Spark_Connect_WriteOperation
+typealias WriteOperationV2 = Spark_Connect_WriteOperationV2
 typealias YearMonthInterval = Spark_Connect_DataType.YearMonthInterval
diff --git a/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift 
b/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift
new file mode 100644
index 0000000..e6e7aa1
--- /dev/null
+++ b/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift
@@ -0,0 +1,89 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+import Foundation
+import SparkConnect
+import Testing
+
+/// A test suite for `DataFrameWriterV2`
+struct DataFrameWriterV2Tests {
+
+  @Test
+  func create() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", 
with: "")
+    try await SQLHelper.withTable(spark, tableName)({
+      let write = try await spark.range(2).writeTo(tableName).using("orc")
+      try await write.create()
+      #expect(try await spark.table(tableName).count() == 2)
+      try await #require(throws: Error.self) {
+        try await write.create()
+      }
+    })
+    await spark.stop()
+  }
+
+  @Test
+  func createOrReplace() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", 
with: "")
+    try await SQLHelper.withTable(spark, tableName)({
+      let write = try await spark.range(2).writeTo(tableName).using("orc")
+      try await write.create()
+      #expect(try await spark.table(tableName).count() == 2)
+      // TODO: Use Iceberg to verify success case after Iceberg supports 
Apache Spark 4
+      try await #require(throws: Error.self) {
+        try await write.createOrReplace()
+      }
+    })
+    await spark.stop()
+  }
+
+  @Test
+  func replace() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", 
with: "")
+    try await SQLHelper.withTable(spark, tableName)({
+      let write = try await spark.range(2).writeTo(tableName).using("orc")
+      try await write.create()
+      #expect(try await spark.table(tableName).count() == 2)
+      // TODO: Use Iceberg to verify success case after Iceberg supports 
Apache Spark 4
+      try await #require(throws: Error.self) {
+        try await write.replace()
+      }
+    })
+    await spark.stop()
+  }
+
+  @Test
+  func append() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", 
with: "")
+    try await SQLHelper.withTable(spark, tableName)({
+      let write = try await spark.range(2).writeTo(tableName).using("orc")
+      try await write.create()
+      #expect(try await spark.table(tableName).count() == 2)
+      // TODO: Use Iceberg to verify success case after Iceberg supports 
Apache Spark 4
+      try await #require(throws: Error.self) {
+        try await write.append()
+      }
+    })
+    await spark.stop()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to