This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new 9499253  [SPARK-51992] Support `interrupt(Tag|Operation|All)` in 
`SparkSession`
9499253 is described below

commit 9499253c6423914d518dbabdeb42c3a63ebca828
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Sat May 3 12:06:38 2025 -0700

    [SPARK-51992] Support `interrupt(Tag|Operation|All)` in `SparkSession`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `interrupt(Tag|Operation|All)` API of 
`SparkSession`.
    
    ### Why are the changes needed?
    
    For feature parity.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #108 from dongjoon-hyun/SPARK-51992.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../Documentation.docc/SparkSession.md             | 18 ++++++---
 Sources/SparkConnect/SparkConnectClient.swift      | 44 ++++++++++++++++++++++
 Sources/SparkConnect/SparkSession.swift            | 21 +++++++++++
 Tests/SparkConnectTests/SparkSessionTests.swift    | 21 +++++++++++
 4 files changed, 99 insertions(+), 5 deletions(-)

diff --git a/Sources/SparkConnect/Documentation.docc/SparkSession.md 
b/Sources/SparkConnect/Documentation.docc/SparkSession.md
index 5a86034..9bd4f78 100644
--- a/Sources/SparkConnect/Documentation.docc/SparkSession.md
+++ b/Sources/SparkConnect/Documentation.docc/SparkSession.md
@@ -32,15 +32,13 @@ let csvDf = spark.read.csv("path/to/file.csv")
 
 ### Creating Sessions
 
-- ``builder()``
-- ``active()``
+- ``builder``
 - ``stop()``
 
-### DataFrame Operations  
+### DataFrame Operations
 
-- ``range(_:_:)``
+- ``range(_:_:_:)``
 - ``sql(_:)``
-- ``createDataFrame(_:_:)``
 
 ### Data I/O
 
@@ -53,3 +51,13 @@ let csvDf = spark.read.csv("path/to/file.csv")
 ### Catalog Operations
 
 - ``catalog``
+
+### Managing Operations
+
+- ``addTag(_:)``
+- ``removeTag(_:)``
+- ``getTags()``
+- ``clearTags()``
+- ``interruptAll()``
+- ``interruptTag(_:)``
+- ``interruptOperation(_:)``
diff --git a/Sources/SparkConnect/SparkConnectClient.swift 
b/Sources/SparkConnect/SparkConnectClient.swift
index d74c1dd..016f89f 100644
--- a/Sources/SparkConnect/SparkConnectClient.swift
+++ b/Sources/SparkConnect/SparkConnectClient.swift
@@ -559,6 +559,50 @@ public actor SparkConnectClient {
     tags.removeAll()
   }
 
+  public func interruptAll() async throws -> [String] {
+    var request = Spark_Connect_InterruptRequest()
+    request.sessionID = self.sessionID!
+    request.userContext = self.userContext
+    request.clientType = self.clientType
+    request.interruptType = .all
+
+    return try await withGPRC { client in
+      let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
+      let response = try await service.interrupt(request)
+      return response.interruptedIds
+    }
+  }
+
+  public func interruptTag(_ tag: String) async throws -> [String] {
+    var request = Spark_Connect_InterruptRequest()
+    request.sessionID = self.sessionID!
+    request.userContext = self.userContext
+    request.clientType = self.clientType
+    request.interruptType = .tag
+    request.operationTag = tag
+
+    return try await withGPRC { client in
+      let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
+      let response = try await service.interrupt(request)
+      return response.interruptedIds
+    }
+  }
+
+  public func interruptOperation(_ operationId: String) async throws -> 
[String] {
+    var request = Spark_Connect_InterruptRequest()
+    request.sessionID = self.sessionID!
+    request.userContext = self.userContext
+    request.clientType = self.clientType
+    request.interruptType = .operationID
+    request.operationID = operationId
+
+    return try await withGPRC { client in
+      let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
+      let response = try await service.interrupt(request)
+      return response.interruptedIds
+    }
+  }
+
   /// Parse a DDL string to ``Spark_Connect_DataType`` instance.
   /// - Parameter ddlString: A string to parse.
   /// - Returns: A ``Spark_Connect_DataType`` instance.
diff --git a/Sources/SparkConnect/SparkSession.swift 
b/Sources/SparkConnect/SparkSession.swift
index abfe908..bb8b534 100644
--- a/Sources/SparkConnect/SparkSession.swift
+++ b/Sources/SparkConnect/SparkSession.swift
@@ -314,6 +314,27 @@ public actor SparkSession {
     await client.clearTags()
   }
 
+  /// Request to interrupt all currently running operations of this session.
+  /// - Returns: Sequence of operation IDs requested to be interrupted.
+  @discardableResult
+  public func interruptAll() async throws -> [String] {
+    return try await client.interruptAll()
+  }
+
+  /// Request to interrupt all currently running operations of this session 
with the given job tag.
+  /// - Returns: Sequence of operation IDs requested to be interrupted.
+  @discardableResult
+  public func interruptTag(_ tag: String) async throws -> [String] {
+    return try await client.interruptTag(tag)
+  }
+
+  /// Request to interrupt an operation of this session, given its operation 
ID.
+  /// - Returns: Sequence of operation IDs requested to be interrupted.
+  @discardableResult
+  public func interruptOperation(_ operationId: String) async throws -> 
[String] {
+    return try await client.interruptOperation(operationId)
+  }
+
   func sameSemantics(_ plan: Plan, _ otherPlan: Plan) async throws -> Bool {
     return try await client.sameSemantics(plan, otherPlan)
   }
diff --git a/Tests/SparkConnectTests/SparkSessionTests.swift 
b/Tests/SparkConnectTests/SparkSessionTests.swift
index 69f0aee..46bdab5 100644
--- a/Tests/SparkConnectTests/SparkSessionTests.swift
+++ b/Tests/SparkConnectTests/SparkSessionTests.swift
@@ -136,4 +136,25 @@ struct SparkSessionTests {
     }
     await spark.stop()
   }
+
+  @Test
+  func interruptAll() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    #expect(try await spark.interruptAll() == [])
+    await spark.stop()
+  }
+
+  @Test
+  func interruptTag() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    #expect(try await spark.interruptTag("etl") == [])
+    await spark.stop()
+  }
+
+  @Test
+  func interruptOperation() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    #expect(try await spark.interruptOperation("id") == [])
+    await spark.stop()
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to