This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push: new 9499253 [SPARK-51992] Support `interrupt(Tag|Operation|All)` in `SparkSession` 9499253 is described below commit 9499253c6423914d518dbabdeb42c3a63ebca828 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Sat May 3 12:06:38 2025 -0700 [SPARK-51992] Support `interrupt(Tag|Operation|All)` in `SparkSession` ### What changes were proposed in this pull request? This PR aims to support `interrupt(Tag|Operation|All)` API of `SparkSession`. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #108 from dongjoon-hyun/SPARK-51992. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../Documentation.docc/SparkSession.md | 18 ++++++--- Sources/SparkConnect/SparkConnectClient.swift | 44 ++++++++++++++++++++++ Sources/SparkConnect/SparkSession.swift | 21 +++++++++++ Tests/SparkConnectTests/SparkSessionTests.swift | 21 +++++++++++ 4 files changed, 99 insertions(+), 5 deletions(-) diff --git a/Sources/SparkConnect/Documentation.docc/SparkSession.md b/Sources/SparkConnect/Documentation.docc/SparkSession.md index 5a86034..9bd4f78 100644 --- a/Sources/SparkConnect/Documentation.docc/SparkSession.md +++ b/Sources/SparkConnect/Documentation.docc/SparkSession.md @@ -32,15 +32,13 @@ let csvDf = spark.read.csv("path/to/file.csv") ### Creating Sessions -- ``builder()`` -- ``active()`` +- ``builder`` - ``stop()`` -### DataFrame Operations +### DataFrame Operations -- ``range(_:_:)`` +- ``range(_:_:_:)`` - ``sql(_:)`` -- ``createDataFrame(_:_:)`` ### Data I/O @@ -53,3 +51,13 @@ let csvDf = spark.read.csv("path/to/file.csv") ### Catalog Operations - ``catalog`` + +### Managing Operations + +- ``addTag(_:)`` +- ``removeTag(_:)`` +- ``getTags()`` +- ``clearTags()`` +- ``interruptAll()`` +- ``interruptTag(_:)`` +- ``interruptOperation(_:)`` diff --git a/Sources/SparkConnect/SparkConnectClient.swift b/Sources/SparkConnect/SparkConnectClient.swift index d74c1dd..016f89f 100644 --- a/Sources/SparkConnect/SparkConnectClient.swift +++ b/Sources/SparkConnect/SparkConnectClient.swift @@ -559,6 +559,50 @@ public actor SparkConnectClient { tags.removeAll() } + public func interruptAll() async throws -> [String] { + var request = Spark_Connect_InterruptRequest() + request.sessionID = self.sessionID! + request.userContext = self.userContext + request.clientType = self.clientType + request.interruptType = .all + + return try await withGPRC { client in + let service = Spark_Connect_SparkConnectService.Client(wrapping: client) + let response = try await service.interrupt(request) + return response.interruptedIds + } + } + + public func interruptTag(_ tag: String) async throws -> [String] { + var request = Spark_Connect_InterruptRequest() + request.sessionID = self.sessionID! + request.userContext = self.userContext + request.clientType = self.clientType + request.interruptType = .tag + request.operationTag = tag + + return try await withGPRC { client in + let service = Spark_Connect_SparkConnectService.Client(wrapping: client) + let response = try await service.interrupt(request) + return response.interruptedIds + } + } + + public func interruptOperation(_ operationId: String) async throws -> [String] { + var request = Spark_Connect_InterruptRequest() + request.sessionID = self.sessionID! + request.userContext = self.userContext + request.clientType = self.clientType + request.interruptType = .operationID + request.operationID = operationId + + return try await withGPRC { client in + let service = Spark_Connect_SparkConnectService.Client(wrapping: client) + let response = try await service.interrupt(request) + return response.interruptedIds + } + } + /// Parse a DDL string to ``Spark_Connect_DataType`` instance. /// - Parameter ddlString: A string to parse. /// - Returns: A ``Spark_Connect_DataType`` instance. diff --git a/Sources/SparkConnect/SparkSession.swift b/Sources/SparkConnect/SparkSession.swift index abfe908..bb8b534 100644 --- a/Sources/SparkConnect/SparkSession.swift +++ b/Sources/SparkConnect/SparkSession.swift @@ -314,6 +314,27 @@ public actor SparkSession { await client.clearTags() } + /// Request to interrupt all currently running operations of this session. + /// - Returns: Sequence of operation IDs requested to be interrupted. + @discardableResult + public func interruptAll() async throws -> [String] { + return try await client.interruptAll() + } + + /// Request to interrupt all currently running operations of this session with the given job tag. + /// - Returns: Sequence of operation IDs requested to be interrupted. + @discardableResult + public func interruptTag(_ tag: String) async throws -> [String] { + return try await client.interruptTag(tag) + } + + /// Request to interrupt an operation of this session, given its operation ID. + /// - Returns: Sequence of operation IDs requested to be interrupted. + @discardableResult + public func interruptOperation(_ operationId: String) async throws -> [String] { + return try await client.interruptOperation(operationId) + } + func sameSemantics(_ plan: Plan, _ otherPlan: Plan) async throws -> Bool { return try await client.sameSemantics(plan, otherPlan) } diff --git a/Tests/SparkConnectTests/SparkSessionTests.swift b/Tests/SparkConnectTests/SparkSessionTests.swift index 69f0aee..46bdab5 100644 --- a/Tests/SparkConnectTests/SparkSessionTests.swift +++ b/Tests/SparkConnectTests/SparkSessionTests.swift @@ -136,4 +136,25 @@ struct SparkSessionTests { } await spark.stop() } + + @Test + func interruptAll() async throws { + let spark = try await SparkSession.builder.getOrCreate() + #expect(try await spark.interruptAll() == []) + await spark.stop() + } + + @Test + func interruptTag() async throws { + let spark = try await SparkSession.builder.getOrCreate() + #expect(try await spark.interruptTag("etl") == []) + await spark.stop() + } + + @Test + func interruptOperation() async throws { + let spark = try await SparkSession.builder.getOrCreate() + #expect(try await spark.interruptOperation("id") == []) + await spark.stop() + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org