This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 66e13120698 [SPARK-42197][CONNECT] Reuses JVM initialization, and
separate configuration groups to set in remote local mode
66e13120698 is described below
commit 66e131206987589b2b961b9c5f3bc57154b16cb3
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Thu Jan 26 20:57:00 2023 +0900
[SPARK-42197][CONNECT] Reuses JVM initialization, and separate
configuration groups to set in remote local mode
### What changes were proposed in this pull request?
This PR proposes to refactor `_start_connect_server` by:
1. Reusing `SparkContext._ensure_initialized`
2. Separating the configuration groups to be set by default or overwrite
3. Piggyback a fix of removing `connect_not_compiled_message` which is
useless in fact because Spark Connect jars are always compiled together by
default `sbt package` or `mvn package`
### Why are the changes needed?
To make the codes easier to read.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing unittests should cover them.
Closes #39751 from HyukjinKwon/cleanup-conf.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit b3f5f81f43fe2c54c575d4b013ee1f91c20542b3)
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/connect/session.py | 62 ++++++++++++++++------------------
python/pyspark/testing/connectutils.py | 17 ----------
2 files changed, 29 insertions(+), 50 deletions(-)
diff --git a/python/pyspark/sql/connect/session.py
b/python/pyspark/sql/connect/session.py
index 1f11a30ade2..7769917e412 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -42,7 +42,6 @@ from pandas.api.types import ( # type: ignore[attr-defined]
)
from pyspark import SparkContext, SparkConf, __version__
-from pyspark.java_gateway import launch_gateway
from pyspark.sql.connect.client import SparkConnectClient
from pyspark.sql.connect.dataframe import DataFrame
from pyspark.sql.connect.plan import SQL, Range, LocalRelation
@@ -456,7 +455,7 @@ class SparkSession:
@staticmethod
def _start_connect_server(master: str, opts: Dict[str, Any]) -> None:
"""
- Starts the Spark Connect server given the master.
+ Starts the Spark Connect server given the master (thread-unsafe).
At the high level, there are two cases. The first case is development
case, e.g.,
you locally build Apache Spark, and run
``SparkSession.builder.remote("local")``:
@@ -470,7 +469,7 @@ class SparkSession:
3. Starts a JVM (without Spark Context) first, and adds the Spark
Connect server jars
into the current class loader. Otherwise, Spark Context with
``spark.plugins``
cannot be initialized because the JVM is already running without
the jars in
- the class path before executing this Python process for driver side
(in case of
+ the classpath before executing this Python process for driver side
(in case of
PySpark application submission).
4. Starts a regular Spark session that automatically starts a Spark
Connect server
@@ -492,23 +491,30 @@ class SparkSession:
"""
session = PySparkSession._instantiatedSession
if session is None or session._sc._jsc is None:
- conf = SparkConf()
- for k, v in opts.items():
- conf.set(k, v)
-
- # Do not need to worry about the existing configurations because
- # Py4J gateway is not created yet, and `conf` instance is empty
here.
- # The configurations belows are manually manipulated later to
respect
- # the user-specified configuration first right after Py4J gateway
creation.
- conf.set("spark.master", master)
- conf.set("spark.plugins",
"org.apache.spark.sql.connect.SparkConnectPlugin")
- conf.set("spark.local.connect", "1")
+
+ # Configurations to be overwritten
+ overwrite_conf = opts
+ overwrite_conf["spark.master"] = master
+ overwrite_conf["spark.local.connect"] = "1"
+
+ # Configurations to be set if unset.
+ default_conf = {"spark.plugins":
"org.apache.spark.sql.connect.SparkConnectPlugin"}
+
+ def create_conf(**kwargs: Any) -> SparkConf:
+ conf = SparkConf(**kwargs)
+ for k, v in overwrite_conf.items():
+ conf.set(k, v)
+ for k, v in default_conf.items():
+ if not conf.contains(k):
+ conf.set(k, v)
+ return conf
# Check if we're using unreleased version that is in development.
# Also checks SPARK_TESTING for RC versions.
is_dev_mode = (
"dev" in LooseVersion(__version__).version or "SPARK_TESTING"
in os.environ
)
+
origin_remote = os.environ.get("SPARK_REMOTE", None)
try:
if origin_remote is not None:
@@ -516,7 +522,8 @@ class SparkSession:
# start the regular PySpark session.
del os.environ["SPARK_REMOTE"]
- connect_jar = None
+
SparkContext._ensure_initialized(conf=create_conf(loadDefaults=False))
+
if is_dev_mode:
# Try and catch for a possibility in production because
pyspark.testing
# does not exist in the canonical release.
@@ -536,29 +543,18 @@ class SparkSession:
" was not found. Manually locate the jars and
specify them, e.g., "
"'spark.jars' configuration."
)
+ else:
+ pyutils = SparkContext._jvm.PythonSQLUtils #
type: ignore[union-attr]
+ pyutils.addJarToCurrentClassLoader(connect_jar)
+
except ImportError:
pass
- # Note that JVM is already up at this point in the case of
Python
- # application submission.
- with SparkContext._lock:
- if not SparkContext._gateway:
- SparkContext._gateway = launch_gateway(conf)
- SparkContext._jvm = SparkContext._gateway.jvm
- if connect_jar is not None:
-
SparkContext._jvm.PythonSQLUtils.addJarToCurrentClassLoader(connect_jar)
-
- # Now, JVM is up, and respect the default set.
- prev = conf
- conf = SparkConf(_jvm=SparkContext._jvm)
- conf.set("spark.master", master)
- for k, v in prev.getAll():
- if not conf.contains(k):
- conf.set(k, v)
-
# The regular PySpark session is registered as an active
session
# so would not be garbage-collected.
- PySparkSession(SparkContext.getOrCreate(conf))
+ PySparkSession(
+ SparkContext.getOrCreate(create_conf(loadDefaults=True,
_jvm=SparkContext._jvm))
+ )
finally:
if origin_remote is not None:
os.environ["SPARK_REMOTE"] = origin_remote
diff --git a/python/pyspark/testing/connectutils.py
b/python/pyspark/testing/connectutils.py
index 210d525ade7..7bc51902490 100644
--- a/python/pyspark/testing/connectutils.py
+++ b/python/pyspark/testing/connectutils.py
@@ -25,7 +25,6 @@ from pyspark import Row, SparkConf
from pyspark.testing.utils import PySparkErrorTestUtils
from pyspark.testing.sqlutils import (
have_pandas,
- have_pyarrow,
pandas_requirement_message,
pyarrow_requirement_message,
SQLTestUtils,
@@ -55,27 +54,11 @@ except ImportError as e:
googleapis_common_protos_requirement_message = str(e)
have_googleapis_common_protos = googleapis_common_protos_requirement_message
is None
-if (
- have_pandas
- and have_pyarrow
- and have_grpc
- and have_grpc_status
- and have_googleapis_common_protos
-):
- connect_not_compiled_message = None
-else:
- connect_not_compiled_message = (
- "Skipping all Spark Connect Python tests as the optional Spark Connect
project was "
- "not compiled into a JAR. To run these tests, you need to build Spark
with "
- "'build/sbt package' or 'build/mvn package' before running this test."
- )
-
connect_requirement_message = (
pandas_requirement_message
or pyarrow_requirement_message
or grpc_requirement_message
- or connect_not_compiled_message
or googleapis_common_protos_requirement_message
or grpc_status_requirement_message
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]