This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a713a66e1c4f [SPARK-49142][CONNECT][PYTHON] Lower Spark Connect client
log level to debug
a713a66e1c4f is described below
commit a713a66e1c4f98eea6818e8c81d3bbfb0a1eb947
Author: Niranjan Jayakar <[email protected]>
AuthorDate: Thu Aug 8 17:46:35 2024 +0900
[SPARK-49142][CONNECT][PYTHON] Lower Spark Connect client log level to debug
### What changes were proposed in this pull request?
Spark Connect client produces a fair amount of logs but is incorrectly
configured at INFO level. When clients set the global log level to INFO,
a lot of debug level logs are produced.
### Why are the changes needed?
Since we're using the Python logging utility, it's possible for clients and
even other libraries to set the target log level.
When this occurs, a lot of spurious debug log messages are emitted
by the Spark Connect client.
### Does this PR introduce _any_ user-facing change?
Yes. Spark Connect client logs level will now be produced at debug level
and not info level.
### How was this patch tested?
Manually by changing the log levels to see that the messages are emitted
at the right level.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47647 from nija-at/log-debug.
Authored-by: Niranjan Jayakar <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/connect/client/core.py | 45 +++++++++++++------------------
1 file changed, 19 insertions(+), 26 deletions(-)
diff --git a/python/pyspark/sql/connect/client/core.py
b/python/pyspark/sql/connect/client/core.py
index 53d1412e9d05..86e5aad1b2c4 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -24,7 +24,6 @@ from pyspark.sql.connect.utils import check_dependencies
check_dependencies(__name__)
-import logging
import threading
import os
import copy
@@ -194,7 +193,7 @@ class ChannelBuilder:
channel = grpc.insecure_channel(target, options=self._channel_options,
**kwargs)
if len(self._interceptors) > 0:
- logger.info(f"Applying interceptors ({self._interceptors})")
+ logger.debug(f"Applying interceptors ({self._interceptors})")
channel = grpc.intercept_channel(channel, *self._interceptors)
return channel
@@ -202,7 +201,7 @@ class ChannelBuilder:
channel = grpc.secure_channel(target, credentials,
options=self._channel_options, **kwargs)
if len(self._interceptors) > 0:
- logger.info(f"Applying interceptors ({self._interceptors})")
+ logger.debug(f"Applying interceptors ({self._interceptors})")
channel = grpc.intercept_channel(channel, *self._interceptors)
return channel
@@ -846,7 +845,7 @@ class SparkConnectClient(object):
)
def _resources(self) -> Dict[str, ResourceInformation]:
- logger.info("Fetching the resources")
+ logger.debug("Fetching the resources")
cmd = pb2.Command()
cmd.get_resources_command.SetInParent()
(_, properties, _) = self.execute_command(cmd)
@@ -864,8 +863,7 @@ class SparkConnectClient(object):
"""
Return given plan as a PyArrow Table iterator.
"""
- if logger.isEnabledFor(logging.INFO):
- logger.info(f"Executing plan {self._proto_to_string(plan, True)}")
+ logger.info(f"Executing plan {self._proto_to_string(plan, True)}")
req = self._execute_plan_request_with_metadata()
req.plan.CopyFrom(plan)
with Progress(handlers=self._progress_handlers,
operation_id=req.operation_id) as progress:
@@ -881,8 +879,7 @@ class SparkConnectClient(object):
"""
Return given plan as a PyArrow Table.
"""
- if logger.isEnabledFor(logging.INFO):
- logger.info(f"Executing plan {self._proto_to_string(plan, True)}")
+ logger.debug(f"Executing plan {self._proto_to_string(plan, True)}")
req = self._execute_plan_request_with_metadata()
req.plan.CopyFrom(plan)
table, schema, metrics, observed_metrics, _ =
self._execute_and_fetch(req, observations)
@@ -898,8 +895,7 @@ class SparkConnectClient(object):
"""
Return given plan as a pandas DataFrame.
"""
- if logger.isEnabledFor(logging.INFO):
- logger.info(f"Executing plan {self._proto_to_string(plan, True)}")
+ logger.debug(f"Executing plan {self._proto_to_string(plan, True)}")
req = self._execute_plan_request_with_metadata()
req.plan.CopyFrom(plan)
(self_destruct_conf,) = self.get_config_with_defaults(
@@ -1049,8 +1045,7 @@ class SparkConnectClient(object):
"""
Return schema for given plan.
"""
- if logger.isEnabledFor(logging.INFO):
- logger.info(f"Schema for plan: {self._proto_to_string(plan,
True)}")
+ logger.debug(f"Schema for plan: {self._proto_to_string(plan, True)}")
schema = self._analyze(method="schema", plan=plan).schema
assert schema is not None
# Server side should populate the struct field which is the schema.
@@ -1061,10 +1056,7 @@ class SparkConnectClient(object):
"""
Return explain string for given plan.
"""
- if logger.isEnabledFor(logging.INFO):
- logger.info(
- f"Explain (mode={explain_mode}) for plan
{self._proto_to_string(plan, True)}"
- )
+ logger.debug(f"Explain (mode={explain_mode}) for plan
{self._proto_to_string(plan, True)}")
result = self._analyze(
method="explain", plan=plan, explain_mode=explain_mode
).explain_string
@@ -1077,8 +1069,7 @@ class SparkConnectClient(object):
"""
Execute given command.
"""
- if logger.isEnabledFor(logging.INFO):
- logger.info(f"Execute command for command
{self._proto_to_string(command, True)}")
+ logger.debug(f"Execute command for command
{self._proto_to_string(command, True)}")
req = self._execute_plan_request_with_metadata()
if self._user_id:
req.user_context.user_id = self._user_id
@@ -1099,10 +1090,9 @@ class SparkConnectClient(object):
"""
Execute given command. Similar to execute_command, but the value is
returned using yield.
"""
- if logger.isEnabledFor(logging.INFO):
- logger.info(
- f"Execute command as iterator for command
{self._proto_to_string(command, True)}"
- )
+ logger.debug(
+ f"Execute command as iterator for command
{self._proto_to_string(command, True)}"
+ )
req = self._execute_plan_request_with_metadata()
if self._user_id:
req.user_context.user_id = self._user_id
@@ -1286,7 +1276,7 @@ class SparkConnectClient(object):
Proto representation of the plan.
"""
- logger.info("Execute")
+ logger.debug("Execute")
def handle_response(b: pb2.ExecutePlanResponse) -> None:
self._verify_response_integrity(b)
@@ -1321,7 +1311,7 @@ class SparkConnectClient(object):
Dict[str, Any],
]
]:
- logger.info("ExecuteAndFetchAsIterator")
+ logger.debug(f"ExecuteAndFetchAsIterator. Request: {req}")
num_records = 0
@@ -1340,6 +1330,8 @@ class SparkConnectClient(object):
nonlocal num_records
# The session ID is the local session ID and should match what we
expect.
self._verify_response_integrity(b)
+ logger.debug(f"ExecuteAndFetchAsIterator. Response received: {b}")
+
if b.HasField("metrics"):
logger.debug("Received metric batch.")
yield from self._build_metrics(b.metrics)
@@ -1470,7 +1462,7 @@ class SparkConnectClient(object):
List[PlanObservedMetrics],
Dict[str, Any],
]:
- logger.info("ExecuteAndFetch")
+ logger.debug("ExecuteAndFetch")
observed_metrics: List[PlanObservedMetrics] = []
metrics: List[PlanMetrics] = []
@@ -1797,6 +1789,7 @@ class SparkConnectClient(object):
if d.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
info = error_details_pb2.ErrorInfo()
d.Unpack(info)
+ logger.debug(f"Received ErrorInfo: {info}")
if info.metadata["errorClass"] ==
"INVALID_HANDLE.SESSION_CHANGED":
self._closed = True
@@ -1876,7 +1869,7 @@ class SparkConnectClient(object):
def _create_profile(self, profile: pb2.ResourceProfile) -> int:
"""Create the ResourceProfile on the server side and return the
profile ID"""
- logger.info("Creating the ResourceProfile")
+ logger.debug("Creating the ResourceProfile")
cmd = pb2.Command()
cmd.create_resource_profile_command.profile.CopyFrom(profile)
(_, properties, _) = self.execute_command(cmd)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]