This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 81cb3e88b956 [SPARK-54744][PYTHON] Only invalidate cache when necessary
81cb3e88b956 is described below
commit 81cb3e88b956af259ffa7465578d77f14b5073fc
Author: Tian Gao <[email protected]>
AuthorDate: Fri Dec 19 15:15:20 2025 +0800
[SPARK-54744][PYTHON] Only invalidate cache when necessary
### What changes were proposed in this pull request?
Only invalidate cache when necessary to reduce overhead in worker process.
### Why are the changes needed?
To reduce overhead for worker process.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Confirmed in viztracer.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53518 from gaogaotiantian/avoid-unnecessary-invalidate.
Authored-by: Tian Gao <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/worker_util.py | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git a/python/pyspark/worker_util.py b/python/pyspark/worker_util.py
index c2f35db8d52d..ccca0630d2af 100644
--- a/python/pyspark/worker_util.py
+++ b/python/pyspark/worker_util.py
@@ -50,11 +50,13 @@ pickleSer = CPickleSerializer()
utf8_deserializer = UTF8Deserializer()
-def add_path(path: str) -> None:
+def add_path(path: str) -> bool:
# worker can be used, so do not add path multiple times
if path not in sys.path:
# overwrite system packages
sys.path.insert(1, path)
+ return True
+ return False
def read_command(serializer: FramedSerializer, file: IO) -> Any:
@@ -135,13 +137,14 @@ def setup_spark_files(infile: IO) -> None:
SparkFiles._is_running_on_worker = True
# fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
- add_path(spark_files_dir) # *.py files that were added will be copied here
+ path_changed = add_path(spark_files_dir) # *.py files that were added
will be copied here
num_python_includes = read_int(infile)
for _ in range(num_python_includes):
filename = utf8_deserializer.loads(infile)
- add_path(os.path.join(spark_files_dir, filename))
+ path_changed = add_path(os.path.join(spark_files_dir, filename)) or
path_changed
- importlib.invalidate_caches()
+ if path_changed:
+ importlib.invalidate_caches()
def setup_broadcasts(infile: IO) -> None:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]