This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cc59165ad48 [SPARK-55663][PYTHON] Unify __module__ for data source 
functions
1cc59165ad48 is described below

commit 1cc59165ad4892afab803ccc1f54542f1bb3340f
Author: Tian Gao <[email protected]>
AuthorDate: Mon Mar 2 14:30:27 2026 +0800

    [SPARK-55663][PYTHON] Unify __module__ for data source functions
    
    ### What changes were proposed in this pull request?
    
    Always set `__module__` to be something meaningful for datasource functions 
and workers.
    
    ### Why are the changes needed?
    
    The data source profiler depends on the module name of the worker/function. 
When invoked with simpler worker, the `__module__` would be `__main__` which is 
not informative enough for the profilers. We should avoid having them to be 
`__main__`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Locally simple worker + profiler recognizes it.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #54457 from gaogaotiantian/unify-module-name.
    
    Authored-by: Tian Gao <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/worker/plan_data_source_read.py  |  5 +++++
 python/pyspark/sql/worker/utils.py                  | 10 +++++++++-
 python/pyspark/sql/worker/write_into_data_source.py |  5 +++++
 3 files changed, 19 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/worker/plan_data_source_read.py 
b/python/pyspark/sql/worker/plan_data_source_read.py
index c858a99462b1..8ea746bf6916 100644
--- a/python/pyspark/sql/worker/plan_data_source_read.py
+++ b/python/pyspark/sql/worker/plan_data_source_read.py
@@ -219,6 +219,11 @@ def write_read_func_and_partitions(
 
         return records_to_arrow_batches(output_iter, max_arrow_batch_size, 
return_type, data_source)
 
+    # Set the module name so UDF worker can recognize that this is a data 
source function.
+    # This is needed when simple worker is used because the __module__ will be 
set to
+    # __main__, which confuses the profiler logic.
+    data_source_read_func.__module__ = 
"pyspark.sql.worker.plan_data_source_read"
+
     command = (data_source_read_func, return_type)
     pickleSer._write_with_length(command, outfile)
 
diff --git a/python/pyspark/sql/worker/utils.py 
b/python/pyspark/sql/worker/utils.py
index 8a99abe3e4e9..406894fc275a 100644
--- a/python/pyspark/sql/worker/utils.py
+++ b/python/pyspark/sql/worker/utils.py
@@ -70,7 +70,15 @@ def worker_run(main: Callable, infile: IO, outfile: IO) -> 
None:
             SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam
         )
 
-        worker_module = main.__module__.split(".")[-1]
+        if main.__module__ == "__main__":
+            try:
+                worker_module = sys.modules["__main__"].__spec__.name  # type: 
ignore[union-attr]
+            except Exception:
+                worker_module = "__main__"
+        else:
+            worker_module = main.__module__
+        worker_module = worker_module.split(".")[-1]
+
         if conf.profiler == "perf":
             with WorkerPerfProfiler(accumulator, worker_module):
                 main(infile, outfile)
diff --git a/python/pyspark/sql/worker/write_into_data_source.py 
b/python/pyspark/sql/worker/write_into_data_source.py
index 83bdedb2fdbe..7498e60fa6a4 100644
--- a/python/pyspark/sql/worker/write_into_data_source.py
+++ b/python/pyspark/sql/worker/write_into_data_source.py
@@ -224,6 +224,11 @@ def _main(infile: IO, outfile: IO) -> None:
         messages = pa.array([pickled])
         yield pa.record_batch([messages], names=[return_col_name])
 
+    # Set the module name so UDF worker can recognize that this is a data 
source function.
+    # This is needed when simple worker is used because the __module__ will be 
set to
+    # __main__, which confuses the profiler logic.
+    data_source_write_func.__module__ = 
"pyspark.sql.worker.write_into_data_source"
+
     # Return the pickled write UDF.
     command = (data_source_write_func, return_type)
     pickleSer._write_with_length(command, outfile)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to