This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push: new ac3c85e [SPARK-51970] Support to create and drop temporary views in `DataFrame` and `Catalog` ac3c85e is described below commit ac3c85e4ab405de36c30f6b2f689fd67ad557b15 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Wed Apr 30 19:35:46 2025 -0700 [SPARK-51970] Support to create and drop temporary views in `DataFrame` and `Catalog` ### What changes were proposed in this pull request? This PR aims to add the following APIs. - `DataFrame` - `createTempView` - `createOrReplaceTempView` - `createGlobalTempView` - `createOrReplaceGlobalTempView` - `Catalog` - `dropTempView` - `dropGlobalTempView` - `SQLHelper` - `withTempView` - `withGlobalTempView` ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #98 from dongjoon-hyun/SPARK-51970. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- Sources/SparkConnect/Catalog.swift | 32 ++++++++ Sources/SparkConnect/DataFrame.swift | 32 ++++++++ Sources/SparkConnect/SparkConnectClient.swift | 14 ++++ Tests/SparkConnectTests/CatalogTests.swift | 111 ++++++++++++++++++++++++++ Tests/SparkConnectTests/SQLHelper.swift | 30 +++++++ 5 files changed, 219 insertions(+) diff --git a/Sources/SparkConnect/Catalog.swift b/Sources/SparkConnect/Catalog.swift index 1fe0d8f..04bc29c 100644 --- a/Sources/SparkConnect/Catalog.swift +++ b/Sources/SparkConnect/Catalog.swift @@ -393,4 +393,36 @@ public actor Catalog: Sendable { }) try await df.count() } + + /// Drops the local temporary view with the given view name in the catalog. If the view has been + /// cached before, then it will also be uncached. + /// - Parameter viewName: The name of the temporary view to be dropped. + /// - Returns: true if the view is dropped successfully, false otherwise. + @discardableResult + public func dropTempView(_ viewName: String) async throws -> Bool { + let df = getDataFrame({ + var dropTempView = Spark_Connect_DropTempView() + dropTempView.viewName = viewName + var catalog = Spark_Connect_Catalog() + catalog.dropTempView = dropTempView + return catalog + }) + return "true" == (try await df.collect().first!.get(0) as! String) + } + + /// Drops the global temporary view with the given view name in the catalog. If the view has been + /// cached before, then it will also be uncached. + /// - Parameter viewName: The unqualified name of the temporary view to be dropped. + /// - Returns: true if the view is dropped successfully, false otherwise. + @discardableResult + public func dropGlobalTempView(_ viewName: String) async throws -> Bool { + let df = getDataFrame({ + var dropGlobalTempView = Spark_Connect_DropGlobalTempView() + dropGlobalTempView.viewName = viewName + var catalog = Spark_Connect_Catalog() + catalog.dropGlobalTempView = dropGlobalTempView + return catalog + }) + return "true" == (try await df.collect().first!.get(0) as! String) + } } diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift index 12c855c..d3eb909 100644 --- a/Sources/SparkConnect/DataFrame.swift +++ b/Sources/SparkConnect/DataFrame.swift @@ -856,6 +856,38 @@ public actor DataFrame: Sendable { return GroupedData(self, GroupType.cube, cols) } + /// Creates a local temporary view using the given name. The lifetime of this temporary view is + /// tied to the `SparkSession` that was used to create this ``DataFrame``. + /// - Parameter viewName: A view name. + public func createTempView(_ viewName: String) async throws { + try await createTempView(viewName, replace: false, global: false) + } + + /// Creates a local temporary view using the given name. The lifetime of this temporary view is + /// tied to the `SparkSession` that was used to create this ``DataFrame``. + /// - Parameter viewName: A view name. + public func createOrReplaceTempView(_ viewName: String) async throws { + try await createTempView(viewName, replace: true, global: false) + } + + /// Creates a global temporary view using the given name. The lifetime of this temporary view is + /// tied to this Spark application, but is cross-session. + /// - Parameter viewName: A view name. + public func createGlobalTempView(_ viewName: String) async throws { + try await createTempView(viewName, replace: false, global: true) + } + + /// Creates a global temporary view using the given name. The lifetime of this temporary view is + /// tied to this Spark application, but is cross-session. + /// - Parameter viewName: A view name. + public func createOrReplaceGlobalTempView(_ viewName: String) async throws { + try await createTempView(viewName, replace: true, global: true) + } + + func createTempView(_ viewName: String, replace: Bool, global: Bool) async throws { + try await spark.client.createTempView(self.plan.root, viewName, replace: replace, isGlobal: global) + } + /// Returns a ``DataFrameWriter`` that can be used to write non-streaming data. public var write: DataFrameWriter { get { diff --git a/Sources/SparkConnect/SparkConnectClient.swift b/Sources/SparkConnect/SparkConnectClient.swift index 57eaffd..d74c1dd 100644 --- a/Sources/SparkConnect/SparkConnectClient.swift +++ b/Sources/SparkConnect/SparkConnectClient.swift @@ -729,6 +729,20 @@ public actor SparkConnectClient { return plan } + func createTempView( + _ child: Relation, _ viewName: String, replace: Bool, isGlobal: Bool + ) async throws { + var viewCommand = Spark_Connect_CreateDataFrameViewCommand() + viewCommand.input = child + viewCommand.name = viewName + viewCommand.replace = replace + viewCommand.isGlobal = isGlobal + + var command = Spark_Connect_Command() + command.createDataframeView = viewCommand + try await execute(self.sessionID!, command) + } + private enum URIParams { static let PARAM_GRPC_MAX_MESSAGE_SIZE = "grpc_max_message_size" static let PARAM_SESSION_ID = "session_id" diff --git a/Tests/SparkConnectTests/CatalogTests.swift b/Tests/SparkConnectTests/CatalogTests.swift index d98fdc8..134a3ac 100644 --- a/Tests/SparkConnectTests/CatalogTests.swift +++ b/Tests/SparkConnectTests/CatalogTests.swift @@ -154,6 +154,117 @@ struct CatalogTests { } await spark.stop() } + + @Test + func createTempView() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let viewName = "VIEW_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") + try await SQLHelper.withTempView(spark, viewName)({ + #expect(try await spark.catalog.tableExists(viewName) == false) + try await spark.range(1).createTempView(viewName) + #expect(try await spark.catalog.tableExists(viewName)) + + try await #require(throws: Error.self) { + try await spark.range(1).createTempView(viewName) + } + }) + + try await #require(throws: Error.self) { + try await spark.range(1).createTempView("invalid view name") + } + + await spark.stop() + } + + @Test + func createOrReplaceTempView() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let viewName = "VIEW_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") + try await SQLHelper.withTempView(spark, viewName)({ + #expect(try await spark.catalog.tableExists(viewName) == false) + try await spark.range(1).createOrReplaceTempView(viewName) + #expect(try await spark.catalog.tableExists(viewName)) + try await spark.range(1).createOrReplaceTempView(viewName) + }) + + try await #require(throws: Error.self) { + try await spark.range(1).createOrReplaceTempView("invalid view name") + } + + await spark.stop() + } + + @Test + func createGlobalTempView() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let viewName = "VIEW_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") + try await SQLHelper.withGlobalTempView(spark, viewName)({ + #expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == false) + try await spark.range(1).createGlobalTempView(viewName) + #expect(try await spark.catalog.tableExists("global_temp.\(viewName)")) + + try await #require(throws: Error.self) { + try await spark.range(1).createGlobalTempView(viewName) + } + }) + #expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == false) + + try await #require(throws: Error.self) { + try await spark.range(1).createGlobalTempView("invalid view name") + } + + await spark.stop() + } + + @Test + func createOrReplaceGlobalTempView() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let viewName = "VIEW_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") + try await SQLHelper.withGlobalTempView(spark, viewName)({ + #expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == false) + try await spark.range(1).createOrReplaceGlobalTempView(viewName) + #expect(try await spark.catalog.tableExists("global_temp.\(viewName)")) + try await spark.range(1).createOrReplaceGlobalTempView(viewName) + }) + #expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == false) + + try await #require(throws: Error.self) { + try await spark.range(1).createOrReplaceGlobalTempView("invalid view name") + } + + await spark.stop() + } + + @Test + func dropTempView() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let viewName = "VIEW_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") + try await SQLHelper.withTempView(spark, viewName)({ #expect(try await spark.catalog.tableExists(viewName) == false) + try await spark.range(1).createTempView(viewName) + try await spark.catalog.dropTempView(viewName) + #expect(try await spark.catalog.tableExists(viewName) == false) + }) + + #expect(try await spark.catalog.dropTempView("non_exist_view") == false) + #expect(try await spark.catalog.dropTempView("invalid view name") == false) + await spark.stop() + } + + @Test + func dropGlobalTempView() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let viewName = "VIEW_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") + try await SQLHelper.withTempView(spark, viewName)({ #expect(try await spark.catalog.tableExists(viewName) == false) + try await spark.range(1).createGlobalTempView(viewName) + #expect(try await spark.catalog.tableExists("global_temp.\(viewName)")) + try await spark.catalog.dropGlobalTempView(viewName) + #expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == false) + }) + + #expect(try await spark.catalog.dropGlobalTempView("non_exist_view") == false) + #expect(try await spark.catalog.dropGlobalTempView("invalid view name") == false) + await spark.stop() + } #endif @Test diff --git a/Tests/SparkConnectTests/SQLHelper.swift b/Tests/SparkConnectTests/SQLHelper.swift index 162ad67..0e96f0c 100644 --- a/Tests/SparkConnectTests/SQLHelper.swift +++ b/Tests/SparkConnectTests/SQLHelper.swift @@ -53,4 +53,34 @@ struct SQLHelper { } return body } + + public static func withTempView(_ spark: SparkSession, _ viewNames: String...) -> ( + () async throws -> Void + ) async throws -> Void { + func body(_ f: () async throws -> Void) async throws { + try await ErrorUtils.tryWithSafeFinally( + f, + { + for name in viewNames { + try await spark.catalog.dropTempView(name) + } + }) + } + return body + } + + public static func withGlobalTempView(_ spark: SparkSession, _ viewNames: String...) -> ( + () async throws -> Void + ) async throws -> Void { + func body(_ f: () async throws -> Void) async throws { + try await ErrorUtils.tryWithSafeFinally( + f, + { + for name in viewNames { + try await spark.catalog.dropGlobalTempView(name) + } + }) + } + return body + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org