This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 19fbdaa54cd7 [SPARK-49142][CONNECT][PYTHON] Follow up to revert proto
to string performance cost
19fbdaa54cd7 is described below
commit 19fbdaa54cd783aec988ff480bc190e983a48d1b
Author: Niranjan Jayakar <[email protected]>
AuthorDate: Fri Aug 9 09:11:39 2024 +0900
[SPARK-49142][CONNECT][PYTHON] Follow up to revert proto to string
performance cost
### What changes were proposed in this pull request?
In a previous commit a713a66e, log level in the connect client
was lowered from info to debug. As part of that, the explicit
check for logging level was removed.
Revert this change and put the logging level check back.
### Why are the changes needed?
The explicit check was there so as to not convert the proto
to a string, done during string interpolation, when not at the
expected log level. Conversion from proto to string can be
expensive for some proto messages.
### Does this PR introduce _any_ user-facing change?
A previously reverted perfomance improvement is now
put back in place.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47667 from nija-at/log-debug.
Authored-by: Niranjan Jayakar <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/connect/client/core.py | 54 ++++++++++++++++++++++++-------
1 file changed, 43 insertions(+), 11 deletions(-)
diff --git a/python/pyspark/sql/connect/client/core.py
b/python/pyspark/sql/connect/client/core.py
index 86e5aad1b2c4..723a11b35c26 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -24,6 +24,7 @@ from pyspark.sql.connect.utils import check_dependencies
check_dependencies(__name__)
+import logging
import threading
import os
import copy
@@ -863,7 +864,10 @@ class SparkConnectClient(object):
"""
Return given plan as a PyArrow Table iterator.
"""
- logger.info(f"Executing plan {self._proto_to_string(plan, True)}")
+ if logger.isEnabledFor(logging.DEBUG):
+ # inside an if statement to not incur a performance cost
converting proto to string
+ # when not at debug log level.
+ logger.debug(f"Executing plan {self._proto_to_string(plan, True)}")
req = self._execute_plan_request_with_metadata()
req.plan.CopyFrom(plan)
with Progress(handlers=self._progress_handlers,
operation_id=req.operation_id) as progress:
@@ -879,7 +883,10 @@ class SparkConnectClient(object):
"""
Return given plan as a PyArrow Table.
"""
- logger.debug(f"Executing plan {self._proto_to_string(plan, True)}")
+ if logger.isEnabledFor(logging.DEBUG):
+ # inside an if statement to not incur a performance cost
converting proto to string
+ # when not at debug log level.
+ logger.debug(f"Executing plan {self._proto_to_string(plan, True)}")
req = self._execute_plan_request_with_metadata()
req.plan.CopyFrom(plan)
table, schema, metrics, observed_metrics, _ =
self._execute_and_fetch(req, observations)
@@ -895,7 +902,10 @@ class SparkConnectClient(object):
"""
Return given plan as a pandas DataFrame.
"""
- logger.debug(f"Executing plan {self._proto_to_string(plan, True)}")
+ if logger.isEnabledFor(logging.DEBUG):
+ # inside an if statement to not incur a performance cost
converting proto to string
+ # when not at debug log level.
+ logger.debug(f"Executing plan {self._proto_to_string(plan, True)}")
req = self._execute_plan_request_with_metadata()
req.plan.CopyFrom(plan)
(self_destruct_conf,) = self.get_config_with_defaults(
@@ -1045,7 +1055,10 @@ class SparkConnectClient(object):
"""
Return schema for given plan.
"""
- logger.debug(f"Schema for plan: {self._proto_to_string(plan, True)}")
+ if logger.isEnabledFor(logging.DEBUG):
+ # inside an if statement to not incur a performance cost
converting proto to string
+ # when not at debug log level.
+ logger.debug(f"Schema for plan: {self._proto_to_string(plan,
True)}")
schema = self._analyze(method="schema", plan=plan).schema
assert schema is not None
# Server side should populate the struct field which is the schema.
@@ -1056,7 +1069,12 @@ class SparkConnectClient(object):
"""
Return explain string for given plan.
"""
- logger.debug(f"Explain (mode={explain_mode}) for plan
{self._proto_to_string(plan, True)}")
+ if logger.isEnabledFor(logging.DEBUG):
+ # inside an if statement to not incur a performance cost
converting proto to string
+ # when not at debug log level.
+ logger.debug(
+ f"Explain (mode={explain_mode}) for plan
{self._proto_to_string(plan, True)}"
+ )
result = self._analyze(
method="explain", plan=plan, explain_mode=explain_mode
).explain_string
@@ -1069,7 +1087,10 @@ class SparkConnectClient(object):
"""
Execute given command.
"""
- logger.debug(f"Execute command for command
{self._proto_to_string(command, True)}")
+ if logger.isEnabledFor(logging.DEBUG):
+ # inside an if statement to not incur a performance cost
converting proto to string
+ # when not at debug log level.
+ logger.debug(f"Execute command for command
{self._proto_to_string(command, True)}")
req = self._execute_plan_request_with_metadata()
if self._user_id:
req.user_context.user_id = self._user_id
@@ -1090,9 +1111,12 @@ class SparkConnectClient(object):
"""
Execute given command. Similar to execute_command, but the value is
returned using yield.
"""
- logger.debug(
- f"Execute command as iterator for command
{self._proto_to_string(command, True)}"
- )
+ if logger.isEnabledFor(logging.DEBUG):
+ # inside an if statement to not incur a performance cost
converting proto to string
+ # when not at debug log level.
+ logger.debug(
+ f"Execute command as iterator for command
{self._proto_to_string(command, True)}"
+ )
req = self._execute_plan_request_with_metadata()
if self._user_id:
req.user_context.user_id = self._user_id
@@ -1311,7 +1335,10 @@ class SparkConnectClient(object):
Dict[str, Any],
]
]:
- logger.debug(f"ExecuteAndFetchAsIterator. Request: {req}")
+ if logger.isEnabledFor(logging.DEBUG):
+ # inside an if statement to not incur a performance cost
converting proto to string
+ # when not at debug log level.
+ logger.debug(f"ExecuteAndFetchAsIterator. Request:
{self._proto_to_string(req)}")
num_records = 0
@@ -1330,7 +1357,12 @@ class SparkConnectClient(object):
nonlocal num_records
# The session ID is the local session ID and should match what we
expect.
self._verify_response_integrity(b)
- logger.debug(f"ExecuteAndFetchAsIterator. Response received: {b}")
+ if logger.isEnabledFor(logging.DEBUG):
+ # inside an if statement to not incur a performance cost
converting proto to string
+ # when not at debug log level.
+ logger.debug(
+ f"ExecuteAndFetchAsIterator. Response received:
{self._proto_to_string(b)}"
+ )
if b.HasField("metrics"):
logger.debug("Received metric batch.")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]