This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fe3d3ae6172b [SPARK-50719][PYTHON] Support `interruptOperation` for 
PySpark
fe3d3ae6172b is described below

commit fe3d3ae6172b88734c33da30e900ed3bfae1417c
Author: Haejoon Lee <[email protected]>
AuthorDate: Fri Jan 10 12:23:10 2025 +0800

    [SPARK-50719][PYTHON] Support `interruptOperation` for PySpark
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to support `interruptOperation` for PySpark
    
    ### Why are the changes needed?
    
    For feature parity with Spark Connect
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this adds a new API
    
    ### How was this patch tested?
    
    The existing CI should pass
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #49423 from itholic/interrupt_operation.
    
    Authored-by: Haejoon Lee <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 .../docs/source/reference/pyspark.sql/spark_session.rst |  2 +-
 python/pyspark/sql/session.py                           | 17 ++++++++++++-----
 python/pyspark/sql/tests/test_connect_compatibility.py  |  1 -
 python/pyspark/sql/tests/test_session.py                |  1 -
 4 files changed, 13 insertions(+), 8 deletions(-)

diff --git a/python/docs/source/reference/pyspark.sql/spark_session.rst 
b/python/docs/source/reference/pyspark.sql/spark_session.rst
index a35fccbcffe9..0d6a1bc79b90 100644
--- a/python/docs/source/reference/pyspark.sql/spark_session.rst
+++ b/python/docs/source/reference/pyspark.sql/spark_session.rst
@@ -53,6 +53,7 @@ See also :class:`SparkSession`.
     SparkSession.getActiveSession
     SparkSession.getTags
     SparkSession.interruptAll
+    SparkSession.interruptOperation
     SparkSession.interruptTag
     SparkSession.newSession
     SparkSession.profile
@@ -88,6 +89,5 @@ Spark Connect Only
     SparkSession.clearProgressHandlers
     SparkSession.client
     SparkSession.copyFromLocalToFs
-    SparkSession.interruptOperation
     SparkSession.registerProgressHandler
     SparkSession.removeProgressHandler
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index fc434cd16bfb..5ab186b2957e 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -2253,13 +2253,15 @@ class SparkSession(SparkConversionMixin):
 
         return python_list
 
-    @remote_only
     def interruptOperation(self, op_id: str) -> List[str]:
         """
         Interrupt an operation of this session with the given operationId.
 
         .. versionadded:: 3.5.0
 
+        .. versionchanged:: 4.0.0
+            Supports Spark Classic.
+
         Returns
         -------
         list of str
@@ -2269,10 +2271,15 @@ class SparkSession(SparkConversionMixin):
         -----
         There is still a possibility of operation finishing just as it is 
interrupted.
         """
-        raise PySparkRuntimeError(
-            errorClass="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
-            messageParameters={"feature": "SparkSession.interruptOperation"},
-        )
+        java_list = self._jsparkSession.interruptOperation(op_id)
+        python_list = list()
+
+        # Use iterator to manually iterate through Java list
+        java_iterator = java_list.iterator()
+        while java_iterator.hasNext():
+            python_list.append(str(java_iterator.next()))
+
+        return python_list
 
     def addTag(self, tag: str) -> None:
         """
diff --git a/python/pyspark/sql/tests/test_connect_compatibility.py 
b/python/pyspark/sql/tests/test_connect_compatibility.py
index 25b8be1f9ac7..4ac68292b402 100644
--- a/python/pyspark/sql/tests/test_connect_compatibility.py
+++ b/python/pyspark/sql/tests/test_connect_compatibility.py
@@ -266,7 +266,6 @@ class ConnectCompatibilityTestsMixin:
             "addArtifacts",
             "clearProgressHandlers",
             "copyFromLocalToFs",
-            "interruptOperation",
             "newSession",
             "registerProgressHandler",
             "removeProgressHandler",
diff --git a/python/pyspark/sql/tests/test_session.py 
b/python/pyspark/sql/tests/test_session.py
index a22fe777e3c9..c21247e3159c 100644
--- a/python/pyspark/sql/tests/test_session.py
+++ b/python/pyspark/sql/tests/test_session.py
@@ -227,7 +227,6 @@ class SparkSessionTests3(unittest.TestCase, 
PySparkErrorTestUtils):
                 (lambda: session.client, "client"),
                 (session.addArtifacts, "addArtifact(s)"),
                 (lambda: session.copyFromLocalToFs("", ""), 
"copyFromLocalToFs"),
-                (lambda: session.interruptOperation(""), "interruptOperation"),
             ]
 
             for func, name in unsupported:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to