This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new ac3c85e  [SPARK-51970] Support to create and drop temporary views in 
`DataFrame` and `Catalog`
ac3c85e is described below

commit ac3c85e4ab405de36c30f6b2f689fd67ad557b15
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Wed Apr 30 19:35:46 2025 -0700

    [SPARK-51970] Support to create and drop temporary views in `DataFrame` and 
`Catalog`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to add the following APIs.
    
    - `DataFrame`
        - `createTempView`
        - `createOrReplaceTempView`
        - `createGlobalTempView`
        - `createOrReplaceGlobalTempView`
    
    - `Catalog`
        - `dropTempView`
        - `dropGlobalTempView`
    
    - `SQLHelper`
        - `withTempView`
        - `withGlobalTempView`
    
    ### Why are the changes needed?
    
    For feature parity.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #98 from dongjoon-hyun/SPARK-51970.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 Sources/SparkConnect/Catalog.swift            |  32 ++++++++
 Sources/SparkConnect/DataFrame.swift          |  32 ++++++++
 Sources/SparkConnect/SparkConnectClient.swift |  14 ++++
 Tests/SparkConnectTests/CatalogTests.swift    | 111 ++++++++++++++++++++++++++
 Tests/SparkConnectTests/SQLHelper.swift       |  30 +++++++
 5 files changed, 219 insertions(+)

diff --git a/Sources/SparkConnect/Catalog.swift 
b/Sources/SparkConnect/Catalog.swift
index 1fe0d8f..04bc29c 100644
--- a/Sources/SparkConnect/Catalog.swift
+++ b/Sources/SparkConnect/Catalog.swift
@@ -393,4 +393,36 @@ public actor Catalog: Sendable {
     })
     try await df.count()
   }
+
+  /// Drops the local temporary view with the given view name in the catalog. 
If the view has been
+  /// cached before, then it will also be uncached.
+  /// - Parameter viewName: The name of the temporary view to be dropped.
+  /// - Returns: true if the view is dropped successfully, false otherwise.
+  @discardableResult
+  public func dropTempView(_ viewName: String) async throws -> Bool {
+    let df = getDataFrame({
+      var dropTempView = Spark_Connect_DropTempView()
+      dropTempView.viewName = viewName
+      var catalog = Spark_Connect_Catalog()
+      catalog.dropTempView = dropTempView
+      return catalog
+    })
+    return "true" == (try await df.collect().first!.get(0) as! String)
+  }
+
+  /// Drops the global temporary view with the given view name in the catalog. 
If the view has been
+  /// cached before, then it will also be uncached.
+  /// - Parameter viewName: The unqualified name of the temporary view to be 
dropped.
+  /// - Returns: true if the view is dropped successfully, false otherwise.
+  @discardableResult
+  public func dropGlobalTempView(_ viewName: String) async throws -> Bool {
+    let df = getDataFrame({
+      var dropGlobalTempView = Spark_Connect_DropGlobalTempView()
+      dropGlobalTempView.viewName = viewName
+      var catalog = Spark_Connect_Catalog()
+      catalog.dropGlobalTempView = dropGlobalTempView
+      return catalog
+    })
+    return "true" == (try await df.collect().first!.get(0) as! String)
+  }
 }
diff --git a/Sources/SparkConnect/DataFrame.swift 
b/Sources/SparkConnect/DataFrame.swift
index 12c855c..d3eb909 100644
--- a/Sources/SparkConnect/DataFrame.swift
+++ b/Sources/SparkConnect/DataFrame.swift
@@ -856,6 +856,38 @@ public actor DataFrame: Sendable {
     return GroupedData(self, GroupType.cube, cols)
   }
 
+  /// Creates a local temporary view using the given name. The lifetime of 
this temporary view is
+  /// tied to the `SparkSession` that was used to create this ``DataFrame``.
+  /// - Parameter viewName: A view name.
+  public func createTempView(_ viewName: String) async throws {
+    try await createTempView(viewName, replace: false, global: false)
+  }
+
+  /// Creates a local temporary view using the given name. The lifetime of 
this temporary view is
+  /// tied to the `SparkSession` that was used to create this ``DataFrame``.
+  /// - Parameter viewName: A view name.
+  public func createOrReplaceTempView(_ viewName: String) async throws {
+    try await createTempView(viewName, replace: true, global: false)
+  }
+
+  /// Creates a global temporary view using the given name. The lifetime of 
this temporary view is
+  /// tied to this Spark application, but is cross-session.
+  /// - Parameter viewName: A view name.
+  public func createGlobalTempView(_ viewName: String) async throws {
+    try await createTempView(viewName, replace: false, global: true)
+  }
+
+  /// Creates a global temporary view using the given name. The lifetime of 
this temporary view is
+  /// tied to this Spark application, but is cross-session.
+  /// - Parameter viewName: A view name.
+  public func createOrReplaceGlobalTempView(_ viewName: String) async throws {
+    try await createTempView(viewName, replace: true, global: true)
+  }
+
+  func createTempView(_ viewName: String, replace: Bool, global: Bool) async 
throws {
+    try await spark.client.createTempView(self.plan.root, viewName, replace: 
replace, isGlobal: global)
+  }
+
   /// Returns a ``DataFrameWriter`` that can be used to write non-streaming 
data.
   public var write: DataFrameWriter {
     get {
diff --git a/Sources/SparkConnect/SparkConnectClient.swift 
b/Sources/SparkConnect/SparkConnectClient.swift
index 57eaffd..d74c1dd 100644
--- a/Sources/SparkConnect/SparkConnectClient.swift
+++ b/Sources/SparkConnect/SparkConnectClient.swift
@@ -729,6 +729,20 @@ public actor SparkConnectClient {
     return plan
   }
 
+  func createTempView(
+    _ child: Relation, _ viewName: String, replace: Bool, isGlobal: Bool
+  ) async throws {
+    var viewCommand = Spark_Connect_CreateDataFrameViewCommand()
+    viewCommand.input = child
+    viewCommand.name = viewName
+    viewCommand.replace = replace
+    viewCommand.isGlobal = isGlobal
+
+    var command = Spark_Connect_Command()
+    command.createDataframeView = viewCommand
+    try await execute(self.sessionID!, command)
+  }
+
   private enum URIParams {
     static let PARAM_GRPC_MAX_MESSAGE_SIZE = "grpc_max_message_size"
     static let PARAM_SESSION_ID = "session_id"
diff --git a/Tests/SparkConnectTests/CatalogTests.swift 
b/Tests/SparkConnectTests/CatalogTests.swift
index d98fdc8..134a3ac 100644
--- a/Tests/SparkConnectTests/CatalogTests.swift
+++ b/Tests/SparkConnectTests/CatalogTests.swift
@@ -154,6 +154,117 @@ struct CatalogTests {
     }
     await spark.stop()
   }
+
+  @Test
+  func createTempView() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let viewName = "VIEW_" + UUID().uuidString.replacingOccurrences(of: "-", 
with: "")
+    try await SQLHelper.withTempView(spark, viewName)({
+      #expect(try await spark.catalog.tableExists(viewName) == false)
+      try await spark.range(1).createTempView(viewName)
+      #expect(try await spark.catalog.tableExists(viewName))
+
+      try await #require(throws: Error.self) {
+        try await spark.range(1).createTempView(viewName)
+      }
+    })
+
+    try await #require(throws: Error.self) {
+      try await spark.range(1).createTempView("invalid view name")
+    }
+
+    await spark.stop()
+  }
+
+  @Test
+  func createOrReplaceTempView() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let viewName = "VIEW_" + UUID().uuidString.replacingOccurrences(of: "-", 
with: "")
+    try await SQLHelper.withTempView(spark, viewName)({
+      #expect(try await spark.catalog.tableExists(viewName) == false)
+      try await spark.range(1).createOrReplaceTempView(viewName)
+      #expect(try await spark.catalog.tableExists(viewName))
+      try await spark.range(1).createOrReplaceTempView(viewName)
+    })
+
+    try await #require(throws: Error.self) {
+      try await spark.range(1).createOrReplaceTempView("invalid view name")
+    }
+
+    await spark.stop()
+  }
+
+  @Test
+  func createGlobalTempView() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let viewName = "VIEW_" + UUID().uuidString.replacingOccurrences(of: "-", 
with: "")
+    try await SQLHelper.withGlobalTempView(spark, viewName)({
+      #expect(try await spark.catalog.tableExists("global_temp.\(viewName)") 
== false)
+      try await spark.range(1).createGlobalTempView(viewName)
+      #expect(try await spark.catalog.tableExists("global_temp.\(viewName)"))
+
+      try await #require(throws: Error.self) {
+        try await spark.range(1).createGlobalTempView(viewName)
+      }
+    })
+    #expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == 
false)
+
+    try await #require(throws: Error.self) {
+      try await spark.range(1).createGlobalTempView("invalid view name")
+    }
+
+    await spark.stop()
+  }
+
+  @Test
+  func createOrReplaceGlobalTempView() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let viewName = "VIEW_" + UUID().uuidString.replacingOccurrences(of: "-", 
with: "")
+    try await SQLHelper.withGlobalTempView(spark, viewName)({
+      #expect(try await spark.catalog.tableExists("global_temp.\(viewName)") 
== false)
+      try await spark.range(1).createOrReplaceGlobalTempView(viewName)
+      #expect(try await spark.catalog.tableExists("global_temp.\(viewName)"))
+      try await spark.range(1).createOrReplaceGlobalTempView(viewName)
+    })
+    #expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == 
false)
+
+    try await #require(throws: Error.self) {
+      try await spark.range(1).createOrReplaceGlobalTempView("invalid view 
name")
+    }
+
+    await spark.stop()
+  }
+
+  @Test
+  func dropTempView() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let viewName = "VIEW_" + UUID().uuidString.replacingOccurrences(of: "-", 
with: "")
+    try await SQLHelper.withTempView(spark, viewName)({      #expect(try await 
spark.catalog.tableExists(viewName) == false)
+      try await spark.range(1).createTempView(viewName)
+      try await spark.catalog.dropTempView(viewName)
+      #expect(try await spark.catalog.tableExists(viewName) == false)
+    })
+
+    #expect(try await spark.catalog.dropTempView("non_exist_view") == false)
+    #expect(try await spark.catalog.dropTempView("invalid view name") == false)
+    await spark.stop()
+  }
+
+  @Test
+  func dropGlobalTempView() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let viewName = "VIEW_" + UUID().uuidString.replacingOccurrences(of: "-", 
with: "")
+    try await SQLHelper.withTempView(spark, viewName)({      #expect(try await 
spark.catalog.tableExists(viewName) == false)
+      try await spark.range(1).createGlobalTempView(viewName)
+      #expect(try await spark.catalog.tableExists("global_temp.\(viewName)"))
+      try await spark.catalog.dropGlobalTempView(viewName)
+      #expect(try await spark.catalog.tableExists("global_temp.\(viewName)") 
== false)
+    })
+
+    #expect(try await spark.catalog.dropGlobalTempView("non_exist_view") == 
false)
+    #expect(try await spark.catalog.dropGlobalTempView("invalid view name") == 
false)
+    await spark.stop()
+  }
 #endif
 
   @Test
diff --git a/Tests/SparkConnectTests/SQLHelper.swift 
b/Tests/SparkConnectTests/SQLHelper.swift
index 162ad67..0e96f0c 100644
--- a/Tests/SparkConnectTests/SQLHelper.swift
+++ b/Tests/SparkConnectTests/SQLHelper.swift
@@ -53,4 +53,34 @@ struct SQLHelper {
     }
     return body
   }
+
+  public static func withTempView(_ spark: SparkSession, _ viewNames: 
String...) -> (
+    () async throws -> Void
+  ) async throws -> Void {
+    func body(_ f: () async throws -> Void) async throws {
+      try await ErrorUtils.tryWithSafeFinally(
+        f,
+        {
+          for name in viewNames {
+            try await spark.catalog.dropTempView(name)
+          }
+        })
+    }
+    return body
+  }
+
+  public static func withGlobalTempView(_ spark: SparkSession, _ viewNames: 
String...) -> (
+    () async throws -> Void
+  ) async throws -> Void {
+    func body(_ f: () async throws -> Void) async throws {
+      try await ErrorUtils.tryWithSafeFinally(
+        f,
+        {
+          for name in viewNames {
+            try await spark.catalog.dropGlobalTempView(name)
+          }
+        })
+    }
+    return body
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to