This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 555aece74d2 [SPARK-45093][CONNECT][PYTHON] Error reporting for
addArtifacts query
555aece74d2 is described below
commit 555aece74d2a22d312e815ec07f5553800e14b9d
Author: Alice Sayutina <[email protected]>
AuthorDate: Fri Sep 22 12:31:23 2023 +0900
[SPARK-45093][CONNECT][PYTHON] Error reporting for addArtifacts query
### What changes were proposed in this pull request?
Add error logging into `addArtifact` (see example in "How this is
tested"). The logging code is moved into separate file to avoid circular
dependency.
### Why are the changes needed?
Currently, in case `addArtifact` is executed with the file which doesn't
exist, the user gets cryptic error
```grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that
terminated with:
status = StatusCode.UNKNOWN
details = "Exception iterating requests!"
debug_error_string = "None"
>
```
Which is hard to debug without deep digging into the subject.
This happens because addArtifact is implemented as client-side streaming
and the actual error happens during grpc consuming iterator generating
requests. Unfortunately grpc doesn't print any debug information for user to
understand the problem.
### Does this PR introduce _any_ user-facing change?
Additional logging which is opt-in same way as before with
`SPARK_CONNECT_LOG_LEVEL` environment variable.
### How was this patch tested?
```
>>> s.addArtifact("XYZ", file=True)
[New:] 2023-09-15 17:06:40,078 11789 ERROR _create_requests Failed to
execute addArtifact: [Errno 2] No such file or directory:
'/Users/alice.sayutina/apache_spark/python/XYZ'
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File
"/Users/alice.sayutina/apache_spark/python/pyspark/sql/connect/session.py",
line 743, in addArtifacts
self._client.add_artifacts(*path, pyfile=pyfile, archive=archive,
file=file)
[....]
File
"/Users/alice.sayutina/oss-venv/lib/python3.11/site-packages/grpc/_channel.py",
line 910, in _end_unary_response_blocking
raise _InactiveRpcError(state) # pytype: disable=not-instantiable
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated
with:
status = StatusCode.UNKNOWN
details = "Exception iterating requests!"
debug_error_string = "None"
>
```
Closes #42949 from cdkrot/SPARK-45093.
Lead-authored-by: Alice Sayutina <[email protected]>
Co-authored-by: Alice Sayutina <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/connect/client/__init__.py | 1 +
python/pyspark/sql/connect/client/artifact.py | 16 +++++--
python/pyspark/sql/connect/client/core.py | 38 +----------------
python/pyspark/sql/connect/client/logging.py | 60 +++++++++++++++++++++++++++
4 files changed, 74 insertions(+), 41 deletions(-)
diff --git a/python/pyspark/sql/connect/client/__init__.py
b/python/pyspark/sql/connect/client/__init__.py
index 469d1c519a5..38523352e5b 100644
--- a/python/pyspark/sql/connect/client/__init__.py
+++ b/python/pyspark/sql/connect/client/__init__.py
@@ -20,3 +20,4 @@ from pyspark.sql.connect.utils import check_dependencies
check_dependencies(__name__)
from pyspark.sql.connect.client.core import * # noqa: F401,F403
+from pyspark.sql.connect.client.logging import getLogLevel # noqa: F401
diff --git a/python/pyspark/sql/connect/client/artifact.py
b/python/pyspark/sql/connect/client/artifact.py
index c858768ccbf..fb31a57e0f6 100644
--- a/python/pyspark/sql/connect/client/artifact.py
+++ b/python/pyspark/sql/connect/client/artifact.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
from pyspark.sql.connect.utils import check_dependencies
+from pyspark.sql.connect.client.logging import logger
check_dependencies(__name__)
@@ -243,11 +244,18 @@ class ArtifactManager:
self, *path: str, pyfile: bool, archive: bool, file: bool
) -> Iterator[proto.AddArtifactsRequest]:
"""Separated for the testing purpose."""
- return self._add_artifacts(
- chain(
- *(self._parse_artifacts(p, pyfile=pyfile, archive=archive,
file=file) for p in path)
+ try:
+ yield from self._add_artifacts(
+ chain(
+ *(
+ self._parse_artifacts(p, pyfile=pyfile,
archive=archive, file=file)
+ for p in path
+ )
+ )
)
- )
+ except Exception as e:
+ logger.error(f"Failed to submit addArtifacts request: {e}")
+ raise
def _retrieve_responses(
self, requests: Iterator[proto.AddArtifactsRequest]
diff --git a/python/pyspark/sql/connect/client/core.py
b/python/pyspark/sql/connect/client/core.py
index ce6ea8ba3ee..34a867ce101 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -17,7 +17,6 @@
__all__ = [
"ChannelBuilder",
"SparkConnectClient",
- "getLogLevel",
]
from pyspark.sql.connect.utils import check_dependencies
@@ -25,7 +24,6 @@ from pyspark.sql.connect.utils import check_dependencies
check_dependencies(__name__)
import threading
-import logging
import os
import platform
import random
@@ -66,6 +64,7 @@ from google.rpc import error_details_pb2
from pyspark.version import __version__
from pyspark.resource.information import ResourceInformation
from pyspark.sql.connect.client.artifact import ArtifactManager
+from pyspark.sql.connect.client.logging import logger
from pyspark.sql.connect.client.reattach import (
ExecutePlanResponseReattachableIterator,
RetryException,
@@ -100,41 +99,6 @@ if TYPE_CHECKING:
from pyspark.sql.connect._typing import DataTypeOrString
-def _configure_logging() -> logging.Logger:
- """Configure logging for the Spark Connect clients."""
- logger = logging.getLogger(__name__)
- handler = logging.StreamHandler()
- handler.setFormatter(
- logging.Formatter(fmt="%(asctime)s %(process)d %(levelname)s
%(funcName)s %(message)s")
- )
- logger.addHandler(handler)
-
- # Check the environment variables for log levels:
- if "SPARK_CONNECT_LOG_LEVEL" in os.environ:
- logger.setLevel(os.environ["SPARK_CONNECT_LOG_LEVEL"].upper())
- else:
- logger.disabled = True
- return logger
-
-
-# Instantiate the logger based on the environment configuration.
-logger = _configure_logging()
-
-
-def getLogLevel() -> Optional[int]:
- """
- This returns this log level as integer, or none (if no logging is enabled).
-
- Spark Connect logging can be configured with environment variable
'SPARK_CONNECT_LOG_LEVEL'
-
- .. versionadded:: 3.5.0
- """
-
- if not logger.disabled:
- return logger.level
- return None
-
-
class ChannelBuilder:
"""
This is a helper class that is used to create a GRPC channel based on the
given
diff --git a/python/pyspark/sql/connect/client/logging.py
b/python/pyspark/sql/connect/client/logging.py
new file mode 100644
index 00000000000..7fdcfaca4cf
--- /dev/null
+++ b/python/pyspark/sql/connect/client/logging.py
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import logging
+import os
+from typing import Optional
+
+__all__ = [
+ "getLogLevel",
+]
+
+
+def _configure_logging() -> logging.Logger:
+ """Configure logging for the Spark Connect clients."""
+ logger = logging.getLogger(__name__)
+ handler = logging.StreamHandler()
+ handler.setFormatter(
+ logging.Formatter(fmt="%(asctime)s %(process)d %(levelname)s
%(funcName)s %(message)s")
+ )
+ logger.addHandler(handler)
+
+ # Check the environment variables for log levels:
+ if "SPARK_CONNECT_LOG_LEVEL" in os.environ:
+ logger.setLevel(os.environ["SPARK_CONNECT_LOG_LEVEL"].upper())
+ else:
+ logger.disabled = True
+ return logger
+
+
+# Instantiate the logger based on the environment configuration.
+logger = _configure_logging()
+
+
+def getLogLevel() -> Optional[int]:
+ """
+ This returns this log level as integer, or none (if no logging is enabled).
+
+ Spark Connect logging can be configured with environment variable
'SPARK_CONNECT_LOG_LEVEL'
+
+ .. versionadded:: 3.5.0
+ """
+
+ if not logger.disabled:
+ return logger.level
+ return None
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]