This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new a39c346  [SPARK-51969] Support `createTable` and 
`(table|function)Exists` in `Catalog`
a39c346 is described below

commit a39c3461e5cdc37b2f144eb35e1bb14876b15e06
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Wed Apr 30 17:56:57 2025 -0700

    [SPARK-51969] Support `createTable` and `(table|function)Exists` in 
`Catalog`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `createTable`, `tableExists`, and `functionExists` 
APIs in `Catalog`.
    
    ### Why are the changes needed?
    
    For feature parity.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #97 from dongjoon-hyun/SPARK-51969.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 Sources/SparkConnect/Catalog.swift         | 108 +++++++++++++++++++++++++++++
 Tests/SparkConnectTests/CatalogTests.swift |  44 ++++++++++++
 2 files changed, 152 insertions(+)

diff --git a/Sources/SparkConnect/Catalog.swift 
b/Sources/SparkConnect/Catalog.swift
index 98f9a1b..1fe0d8f 100644
--- a/Sources/SparkConnect/Catalog.swift
+++ b/Sources/SparkConnect/Catalog.swift
@@ -200,6 +200,114 @@ public actor Catalog: Sendable {
     return try await self.listDatabases(pattern: dbName).count > 0
   }
 
+  /// Creates a table from the given path and returns the corresponding 
``DataFrame``.
+  /// - Parameters:
+  ///   - tableName: A qualified or unqualified name that designates a table. 
If no database
+  ///   identifier is provided, it refers to a table in the current database.
+  ///   - path: A path to load a table.
+  ///   - source: A data source.
+  ///   - description: A table description.
+  ///   - options: A dictionary for table options
+  /// - Returns: A ``DataFrame``.
+  public func createTable(
+    _ tableName: String,
+    _ path: String? = nil,
+    source: String? = nil,
+    description: String? = nil,
+    options: [String: String]? = nil
+  ) -> DataFrame {
+    let df = getDataFrame({
+      var createTable = Spark_Connect_CreateTable()
+      createTable.tableName = tableName
+      if let source {
+        createTable.source = source
+      }
+      createTable.description_p = description ?? ""
+      if let options {
+        for (k, v) in options {
+          createTable.options[k] = v
+        }
+      }
+      if let path {
+        createTable.options["path"] = path
+      }
+      var catalog = Spark_Connect_Catalog()
+      catalog.createTable = createTable
+      return catalog
+    })
+    return df
+  }
+
+  /// Check if the table or view with the specified name exists. This can 
either be a temporary
+  /// view or a table/view.
+  /// - Parameter tableName: a qualified or unqualified name that designates a 
table/view. It follows the same
+  /// resolution rule with SQL: search for temp views first then table/views 
in the current
+  /// database (namespace).
+  /// - Returns: Return true if it exists.
+  public func tableExists(_ tableName: String) async throws -> Bool {
+    let df = getDataFrame({
+      var tableExists = Spark_Connect_TableExists()
+      tableExists.tableName = tableName
+      var catalog = Spark_Connect_Catalog()
+      catalog.tableExists = tableExists
+      return catalog
+    })
+    return "true" == (try await df.collect().first!.get(0) as! String)
+  }
+
+  /// Check if the table or view with the specified name exists. This can 
either be a temporary
+  /// view or a table/view.
+  /// - Parameters:
+  ///   - dbName: an unqualified name that designates a database.
+  ///   - tableName: an unqualified name that designates a table.
+  /// - Returns: Return true if it exists.
+  public func tableExists(_ dbName: String, _ tableName: String) async throws 
-> Bool {
+    let df = getDataFrame({
+      var tableExists = Spark_Connect_TableExists()
+      tableExists.tableName = tableName
+      tableExists.dbName = dbName
+      var catalog = Spark_Connect_Catalog()
+      catalog.tableExists = tableExists
+      return catalog
+    })
+    return "true" == (try await df.collect().first!.get(0) as! String)
+  }
+
+  /// Check if the function with the specified name exists. This can either be 
a temporary function
+  /// or a function.
+  /// - Parameter functionName: a qualified or unqualified name that 
designates a function. It follows the same
+  /// resolution rule with SQL: search for built-in/temp functions first then 
functions in the
+  /// current database (namespace).
+  /// - Returns: Return true if it exists.
+  public func functionExists(_ functionName: String) async throws -> Bool {
+    let df = getDataFrame({
+      var functionExists = Spark_Connect_FunctionExists()
+      functionExists.functionName = functionName
+      var catalog = Spark_Connect_Catalog()
+      catalog.functionExists = functionExists
+      return catalog
+    })
+    return "true" == (try await df.collect().first!.get(0) as! String)
+  }
+
+  /// Check if the function with the specified name exists in the specified 
database under the Hive
+  /// Metastore.
+  /// - Parameters:
+  ///   - dbName: an unqualified name that designates a database.
+  ///   - functionName: an unqualified name that designates a function.
+  /// - Returns: Return true if it exists.
+  public func functionExists(_ dbName: String, _ functionName: String) async 
throws -> Bool {
+    let df = getDataFrame({
+      var functionExists = Spark_Connect_FunctionExists()
+      functionExists.functionName = functionName
+      functionExists.dbName = dbName
+      var catalog = Spark_Connect_Catalog()
+      catalog.functionExists = functionExists
+      return catalog
+    })
+    return "true" == (try await df.collect().first!.get(0) as! String)
+  }
+
   /// Caches the specified table in-memory.
   /// - Parameters:
   ///   - tableName: A qualified or unqualified name that designates a 
table/view.
diff --git a/Tests/SparkConnectTests/CatalogTests.swift 
b/Tests/SparkConnectTests/CatalogTests.swift
index afb0ea7..d98fdc8 100644
--- a/Tests/SparkConnectTests/CatalogTests.swift
+++ b/Tests/SparkConnectTests/CatalogTests.swift
@@ -110,6 +110,50 @@ struct CatalogTests {
     #expect(try await spark.catalog.databaseExists(dbName) == false)
     await spark.stop()
   }
+
+  @Test
+  func createTable() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", 
with: "")
+    try await SQLHelper.withTable(spark, tableName)({
+      try await spark.range(1).write.orc("/tmp/\(tableName)")
+      #expect(try await spark.catalog.createTable(tableName, 
"/tmp/\(tableName)", source: "orc").count() == 1)
+      #expect(try await spark.catalog.tableExists(tableName))
+    })
+    await spark.stop()
+  }
+
+  @Test
+  func tableExists() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", 
with: "")
+    try await SQLHelper.withTable(spark, tableName)({
+      try await spark.range(1).write.parquet("/tmp/\(tableName)")
+      #expect(try await spark.catalog.tableExists(tableName) == false)
+      #expect(try await spark.catalog.createTable(tableName, 
"/tmp/\(tableName)").count() == 1)
+      #expect(try await spark.catalog.tableExists(tableName))
+      #expect(try await spark.catalog.tableExists("default", tableName))
+      #expect(try await spark.catalog.tableExists("default2", tableName) == 
false)
+    })
+    #expect(try await spark.catalog.tableExists(tableName) == false)
+
+    try await #require(throws: Error.self) {
+      try await spark.catalog.tableExists("invalid table name")
+    }
+    await spark.stop()
+  }
+
+  @Test
+  func functionExists() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    #expect(try await spark.catalog.functionExists("base64"))
+    #expect(try await spark.catalog.functionExists("non_exist_function") == 
false)
+
+    try await #require(throws: Error.self) {
+      try await spark.catalog.functionExists("invalid function name")
+    }
+    await spark.stop()
+  }
 #endif
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to