This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 81cb3e88b956 [SPARK-54744][PYTHON] Only invalidate cache when necessary
81cb3e88b956 is described below

commit 81cb3e88b956af259ffa7465578d77f14b5073fc
Author: Tian Gao <[email protected]>
AuthorDate: Fri Dec 19 15:15:20 2025 +0800

    [SPARK-54744][PYTHON] Only invalidate cache when necessary
    
    ### What changes were proposed in this pull request?
    
    Only invalidate cache when necessary to reduce overhead in worker process.
    
    ### Why are the changes needed?
    
    To reduce overhead for worker process.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Confirmed in viztracer.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #53518 from gaogaotiantian/avoid-unnecessary-invalidate.
    
    Authored-by: Tian Gao <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/worker_util.py | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git a/python/pyspark/worker_util.py b/python/pyspark/worker_util.py
index c2f35db8d52d..ccca0630d2af 100644
--- a/python/pyspark/worker_util.py
+++ b/python/pyspark/worker_util.py
@@ -50,11 +50,13 @@ pickleSer = CPickleSerializer()
 utf8_deserializer = UTF8Deserializer()
 
 
-def add_path(path: str) -> None:
+def add_path(path: str) -> bool:
     # worker can be used, so do not add path multiple times
     if path not in sys.path:
         # overwrite system packages
         sys.path.insert(1, path)
+        return True
+    return False
 
 
 def read_command(serializer: FramedSerializer, file: IO) -> Any:
@@ -135,13 +137,14 @@ def setup_spark_files(infile: IO) -> None:
         SparkFiles._is_running_on_worker = True
 
     # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
-    add_path(spark_files_dir)  # *.py files that were added will be copied here
+    path_changed = add_path(spark_files_dir)  # *.py files that were added 
will be copied here
     num_python_includes = read_int(infile)
     for _ in range(num_python_includes):
         filename = utf8_deserializer.loads(infile)
-        add_path(os.path.join(spark_files_dir, filename))
+        path_changed = add_path(os.path.join(spark_files_dir, filename)) or 
path_changed
 
-    importlib.invalidate_caches()
+    if path_changed:
+        importlib.invalidate_caches()
 
 
 def setup_broadcasts(infile: IO) -> None:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to