This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a713a66e1c4f [SPARK-49142][CONNECT][PYTHON] Lower Spark Connect client 
log level to debug
a713a66e1c4f is described below

commit a713a66e1c4f98eea6818e8c81d3bbfb0a1eb947
Author: Niranjan Jayakar <[email protected]>
AuthorDate: Thu Aug 8 17:46:35 2024 +0900

    [SPARK-49142][CONNECT][PYTHON] Lower Spark Connect client log level to debug
    
    ### What changes were proposed in this pull request?
    
    Spark Connect client produces a fair amount of logs but is incorrectly
    configured at INFO level. When clients set the global log level to INFO,
    a lot of debug level logs are produced.
    
    ### Why are the changes needed?
    
    Since we're using the Python logging utility, it's possible for clients and
    even other libraries to set the target log level.
    When this occurs, a lot of spurious debug log messages are emitted
    by the Spark Connect client.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Spark Connect client logs level will now be produced at debug level
    and not info level.
    
    ### How was this patch tested?
    
    Manually by changing the log levels to see that the messages are emitted
    at the right level.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #47647 from nija-at/log-debug.
    
    Authored-by: Niranjan Jayakar <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/sql/connect/client/core.py | 45 +++++++++++++------------------
 1 file changed, 19 insertions(+), 26 deletions(-)

diff --git a/python/pyspark/sql/connect/client/core.py 
b/python/pyspark/sql/connect/client/core.py
index 53d1412e9d05..86e5aad1b2c4 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -24,7 +24,6 @@ from pyspark.sql.connect.utils import check_dependencies
 
 check_dependencies(__name__)
 
-import logging
 import threading
 import os
 import copy
@@ -194,7 +193,7 @@ class ChannelBuilder:
         channel = grpc.insecure_channel(target, options=self._channel_options, 
**kwargs)
 
         if len(self._interceptors) > 0:
-            logger.info(f"Applying interceptors ({self._interceptors})")
+            logger.debug(f"Applying interceptors ({self._interceptors})")
             channel = grpc.intercept_channel(channel, *self._interceptors)
         return channel
 
@@ -202,7 +201,7 @@ class ChannelBuilder:
         channel = grpc.secure_channel(target, credentials, 
options=self._channel_options, **kwargs)
 
         if len(self._interceptors) > 0:
-            logger.info(f"Applying interceptors ({self._interceptors})")
+            logger.debug(f"Applying interceptors ({self._interceptors})")
             channel = grpc.intercept_channel(channel, *self._interceptors)
         return channel
 
@@ -846,7 +845,7 @@ class SparkConnectClient(object):
         )
 
     def _resources(self) -> Dict[str, ResourceInformation]:
-        logger.info("Fetching the resources")
+        logger.debug("Fetching the resources")
         cmd = pb2.Command()
         cmd.get_resources_command.SetInParent()
         (_, properties, _) = self.execute_command(cmd)
@@ -864,8 +863,7 @@ class SparkConnectClient(object):
         """
         Return given plan as a PyArrow Table iterator.
         """
-        if logger.isEnabledFor(logging.INFO):
-            logger.info(f"Executing plan {self._proto_to_string(plan, True)}")
+        logger.info(f"Executing plan {self._proto_to_string(plan, True)}")
         req = self._execute_plan_request_with_metadata()
         req.plan.CopyFrom(plan)
         with Progress(handlers=self._progress_handlers, 
operation_id=req.operation_id) as progress:
@@ -881,8 +879,7 @@ class SparkConnectClient(object):
         """
         Return given plan as a PyArrow Table.
         """
-        if logger.isEnabledFor(logging.INFO):
-            logger.info(f"Executing plan {self._proto_to_string(plan, True)}")
+        logger.debug(f"Executing plan {self._proto_to_string(plan, True)}")
         req = self._execute_plan_request_with_metadata()
         req.plan.CopyFrom(plan)
         table, schema, metrics, observed_metrics, _ = 
self._execute_and_fetch(req, observations)
@@ -898,8 +895,7 @@ class SparkConnectClient(object):
         """
         Return given plan as a pandas DataFrame.
         """
-        if logger.isEnabledFor(logging.INFO):
-            logger.info(f"Executing plan {self._proto_to_string(plan, True)}")
+        logger.debug(f"Executing plan {self._proto_to_string(plan, True)}")
         req = self._execute_plan_request_with_metadata()
         req.plan.CopyFrom(plan)
         (self_destruct_conf,) = self.get_config_with_defaults(
@@ -1049,8 +1045,7 @@ class SparkConnectClient(object):
         """
         Return schema for given plan.
         """
-        if logger.isEnabledFor(logging.INFO):
-            logger.info(f"Schema for plan: {self._proto_to_string(plan, 
True)}")
+        logger.debug(f"Schema for plan: {self._proto_to_string(plan, True)}")
         schema = self._analyze(method="schema", plan=plan).schema
         assert schema is not None
         # Server side should populate the struct field which is the schema.
@@ -1061,10 +1056,7 @@ class SparkConnectClient(object):
         """
         Return explain string for given plan.
         """
-        if logger.isEnabledFor(logging.INFO):
-            logger.info(
-                f"Explain (mode={explain_mode}) for plan 
{self._proto_to_string(plan, True)}"
-            )
+        logger.debug(f"Explain (mode={explain_mode}) for plan 
{self._proto_to_string(plan, True)}")
         result = self._analyze(
             method="explain", plan=plan, explain_mode=explain_mode
         ).explain_string
@@ -1077,8 +1069,7 @@ class SparkConnectClient(object):
         """
         Execute given command.
         """
-        if logger.isEnabledFor(logging.INFO):
-            logger.info(f"Execute command for command 
{self._proto_to_string(command, True)}")
+        logger.debug(f"Execute command for command 
{self._proto_to_string(command, True)}")
         req = self._execute_plan_request_with_metadata()
         if self._user_id:
             req.user_context.user_id = self._user_id
@@ -1099,10 +1090,9 @@ class SparkConnectClient(object):
         """
         Execute given command. Similar to execute_command, but the value is 
returned using yield.
         """
-        if logger.isEnabledFor(logging.INFO):
-            logger.info(
-                f"Execute command as iterator for command 
{self._proto_to_string(command, True)}"
-            )
+        logger.debug(
+            f"Execute command as iterator for command 
{self._proto_to_string(command, True)}"
+        )
         req = self._execute_plan_request_with_metadata()
         if self._user_id:
             req.user_context.user_id = self._user_id
@@ -1286,7 +1276,7 @@ class SparkConnectClient(object):
             Proto representation of the plan.
 
         """
-        logger.info("Execute")
+        logger.debug("Execute")
 
         def handle_response(b: pb2.ExecutePlanResponse) -> None:
             self._verify_response_integrity(b)
@@ -1321,7 +1311,7 @@ class SparkConnectClient(object):
             Dict[str, Any],
         ]
     ]:
-        logger.info("ExecuteAndFetchAsIterator")
+        logger.debug(f"ExecuteAndFetchAsIterator. Request: {req}")
 
         num_records = 0
 
@@ -1340,6 +1330,8 @@ class SparkConnectClient(object):
             nonlocal num_records
             # The session ID is the local session ID and should match what we 
expect.
             self._verify_response_integrity(b)
+            logger.debug(f"ExecuteAndFetchAsIterator. Response received: {b}")
+
             if b.HasField("metrics"):
                 logger.debug("Received metric batch.")
                 yield from self._build_metrics(b.metrics)
@@ -1470,7 +1462,7 @@ class SparkConnectClient(object):
         List[PlanObservedMetrics],
         Dict[str, Any],
     ]:
-        logger.info("ExecuteAndFetch")
+        logger.debug("ExecuteAndFetch")
 
         observed_metrics: List[PlanObservedMetrics] = []
         metrics: List[PlanMetrics] = []
@@ -1797,6 +1789,7 @@ class SparkConnectClient(object):
                 if d.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
                     info = error_details_pb2.ErrorInfo()
                     d.Unpack(info)
+                    logger.debug(f"Received ErrorInfo: {info}")
 
                     if info.metadata["errorClass"] == 
"INVALID_HANDLE.SESSION_CHANGED":
                         self._closed = True
@@ -1876,7 +1869,7 @@ class SparkConnectClient(object):
 
     def _create_profile(self, profile: pb2.ResourceProfile) -> int:
         """Create the ResourceProfile on the server side and return the 
profile ID"""
-        logger.info("Creating the ResourceProfile")
+        logger.debug("Creating the ResourceProfile")
         cmd = pb2.Command()
         cmd.create_resource_profile_command.profile.CopyFrom(profile)
         (_, properties, _) = self.execute_command(cmd)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to