This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6f87fe2f513d [SPARK-44815][CONNECT] Cache df.schema to avoid extra RPC
6f87fe2f513d is described below
commit 6f87fe2f513d1b1a022f0d03b6c81d73d7cfb228
Author: Martin Grund <[email protected]>
AuthorDate: Fri Feb 2 08:49:06 2024 -0400
[SPARK-44815][CONNECT] Cache df.schema to avoid extra RPC
### What changes were proposed in this pull request?
This patch caches the result of the `df.schema` call in the DataFrame to
avoid the extra roundtrip to the Spark Connect service to retrieve the columns
or the schema. Since the Dataframe is immutable, the schema will not change.
### Why are the changes needed?
Performance / Stability
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing UT
Closes #42499 from grundprinzip/SPARK-44815.
Lead-authored-by: Martin Grund <[email protected]>
Co-authored-by: Herman van Hovell <[email protected]>
Co-authored-by: Martin Grund <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
.../jvm/src/main/scala/org/apache/spark/sql/Dataset.scala | 13 ++++++++++++-
python/pyspark/sql/connect/dataframe.py | 12 ++++++++++--
2 files changed, 22 insertions(+), 3 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 949f53409386..9a42afebf8f2 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -244,7 +244,18 @@ class Dataset[T] private[sql] (
* @group basic
* @since 3.4.0
*/
- def schema: StructType = {
+ def schema: StructType = cachedSchema
+
+ /**
+ * The cached schema.
+ *
+ * Schema caching is correct in most cases. Connect is lazy by nature. This
means that we only
+ * resolve the plan when it is submitted for execution or analysis. We do
not cache intermediate
+ * resolved plans. If the input (changes table, view redefinition, etc...)
of the plan changes
+ * between the schema() call, and a subsequent action, the cached schema
might be inconsistent
+ * with the end schema.
+ */
+ private lazy val cachedSchema: StructType = {
DataTypeProtoConverter
.toCatalystType(
sparkSession
diff --git a/python/pyspark/sql/connect/dataframe.py
b/python/pyspark/sql/connect/dataframe.py
index 6d37158142a6..4091016e0d59 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -120,6 +120,7 @@ class DataFrame:
# Check whether _repr_html is supported or not, we use it to avoid
calling RPC twice
# by __repr__ and _repr_html_ while eager evaluation opens.
self._support_repr_html = False
+ self._cached_schema: Optional[StructType] = None
def __repr__(self) -> str:
if not self._support_repr_html:
@@ -1782,8 +1783,15 @@ class DataFrame:
@property
def schema(self) -> StructType:
- query = self._plan.to_proto(self._session.client)
- return self._session.client.schema(query)
+ # Schema caching is correct in most cases. Connect is lazy by nature.
This means that
+ # we only resolve the plan when it is submitted for execution or
analysis. We do not
+ # cache intermediate resolved plan. If the input (changes table, view
redefinition,
+ # etc...) of the plan changes between the schema() call, and a
subsequent action, the
+ # cached schema might be inconsistent with the end schema.
+ if self._cached_schema is None:
+ query = self._plan.to_proto(self._session.client)
+ self._cached_schema = self._session.client.schema(query)
+ return self._cached_schema
schema.__doc__ = PySparkDataFrame.schema.__doc__
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]