This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 66e13120698 [SPARK-42197][CONNECT] Reuses JVM initialization, and 
separate configuration groups to set in remote local mode
66e13120698 is described below

commit 66e131206987589b2b961b9c5f3bc57154b16cb3
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Thu Jan 26 20:57:00 2023 +0900

    [SPARK-42197][CONNECT] Reuses JVM initialization, and separate 
configuration groups to set in remote local mode
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to refactor `_start_connect_server` by:
    
    1. Reusing `SparkContext._ensure_initialized`
    2. Separating the configuration groups to be set by default or overwrite
    3. Piggyback a fix of removing `connect_not_compiled_message` which is 
useless in fact because Spark Connect jars are always compiled together by 
default `sbt package` or `mvn package`
    
    ### Why are the changes needed?
    
    To make the codes easier to read.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing unittests should cover them.
    
    Closes #39751 from HyukjinKwon/cleanup-conf.
    
    Authored-by: Hyukjin Kwon <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
    (cherry picked from commit b3f5f81f43fe2c54c575d4b013ee1f91c20542b3)
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/sql/connect/session.py  | 62 ++++++++++++++++------------------
 python/pyspark/testing/connectutils.py | 17 ----------
 2 files changed, 29 insertions(+), 50 deletions(-)

diff --git a/python/pyspark/sql/connect/session.py 
b/python/pyspark/sql/connect/session.py
index 1f11a30ade2..7769917e412 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -42,7 +42,6 @@ from pandas.api.types import (  # type: ignore[attr-defined]
 )
 
 from pyspark import SparkContext, SparkConf, __version__
-from pyspark.java_gateway import launch_gateway
 from pyspark.sql.connect.client import SparkConnectClient
 from pyspark.sql.connect.dataframe import DataFrame
 from pyspark.sql.connect.plan import SQL, Range, LocalRelation
@@ -456,7 +455,7 @@ class SparkSession:
     @staticmethod
     def _start_connect_server(master: str, opts: Dict[str, Any]) -> None:
         """
-        Starts the Spark Connect server given the master.
+        Starts the Spark Connect server given the master (thread-unsafe).
 
         At the high level, there are two cases. The first case is development 
case, e.g.,
         you locally build Apache Spark, and run 
``SparkSession.builder.remote("local")``:
@@ -470,7 +469,7 @@ class SparkSession:
         3. Starts a JVM (without Spark Context) first, and adds the Spark 
Connect server jars
            into the current class loader. Otherwise, Spark Context with 
``spark.plugins``
            cannot be initialized because the JVM is already running without 
the jars in
-           the class path before executing this Python process for driver side 
(in case of
+           the classpath before executing this Python process for driver side 
(in case of
            PySpark application submission).
 
         4. Starts a regular Spark session that automatically starts a Spark 
Connect server
@@ -492,23 +491,30 @@ class SparkSession:
         """
         session = PySparkSession._instantiatedSession
         if session is None or session._sc._jsc is None:
-            conf = SparkConf()
-            for k, v in opts.items():
-                conf.set(k, v)
-
-            # Do not need to worry about the existing configurations because
-            # Py4J gateway is not created yet, and `conf` instance is empty 
here.
-            # The configurations belows are manually manipulated later to 
respect
-            # the user-specified configuration first right after Py4J gateway 
creation.
-            conf.set("spark.master", master)
-            conf.set("spark.plugins", 
"org.apache.spark.sql.connect.SparkConnectPlugin")
-            conf.set("spark.local.connect", "1")
+
+            # Configurations to be overwritten
+            overwrite_conf = opts
+            overwrite_conf["spark.master"] = master
+            overwrite_conf["spark.local.connect"] = "1"
+
+            # Configurations to be set if unset.
+            default_conf = {"spark.plugins": 
"org.apache.spark.sql.connect.SparkConnectPlugin"}
+
+            def create_conf(**kwargs: Any) -> SparkConf:
+                conf = SparkConf(**kwargs)
+                for k, v in overwrite_conf.items():
+                    conf.set(k, v)
+                for k, v in default_conf.items():
+                    if not conf.contains(k):
+                        conf.set(k, v)
+                return conf
 
             # Check if we're using unreleased version that is in development.
             # Also checks SPARK_TESTING for RC versions.
             is_dev_mode = (
                 "dev" in LooseVersion(__version__).version or "SPARK_TESTING" 
in os.environ
             )
+
             origin_remote = os.environ.get("SPARK_REMOTE", None)
             try:
                 if origin_remote is not None:
@@ -516,7 +522,8 @@ class SparkSession:
                     # start the regular PySpark session.
                     del os.environ["SPARK_REMOTE"]
 
-                connect_jar = None
+                
SparkContext._ensure_initialized(conf=create_conf(loadDefaults=False))
+
                 if is_dev_mode:
                     # Try and catch for a possibility in production because 
pyspark.testing
                     # does not exist in the canonical release.
@@ -536,29 +543,18 @@ class SparkSession:
                                 " was not found. Manually locate the jars and 
specify them, e.g., "
                                 "'spark.jars' configuration."
                             )
+                        else:
+                            pyutils = SparkContext._jvm.PythonSQLUtils  # 
type: ignore[union-attr]
+                            pyutils.addJarToCurrentClassLoader(connect_jar)
+
                     except ImportError:
                         pass
 
-                # Note that JVM is already up at this point in the case of 
Python
-                # application submission.
-                with SparkContext._lock:
-                    if not SparkContext._gateway:
-                        SparkContext._gateway = launch_gateway(conf)
-                        SparkContext._jvm = SparkContext._gateway.jvm
-                        if connect_jar is not None:
-                            
SparkContext._jvm.PythonSQLUtils.addJarToCurrentClassLoader(connect_jar)
-
-                        # Now, JVM is up, and respect the default set.
-                        prev = conf
-                        conf = SparkConf(_jvm=SparkContext._jvm)
-                        conf.set("spark.master", master)
-                        for k, v in prev.getAll():
-                            if not conf.contains(k):
-                                conf.set(k, v)
-
                 # The regular PySpark session is registered as an active 
session
                 # so would not be garbage-collected.
-                PySparkSession(SparkContext.getOrCreate(conf))
+                PySparkSession(
+                    SparkContext.getOrCreate(create_conf(loadDefaults=True, 
_jvm=SparkContext._jvm))
+                )
             finally:
                 if origin_remote is not None:
                     os.environ["SPARK_REMOTE"] = origin_remote
diff --git a/python/pyspark/testing/connectutils.py 
b/python/pyspark/testing/connectutils.py
index 210d525ade7..7bc51902490 100644
--- a/python/pyspark/testing/connectutils.py
+++ b/python/pyspark/testing/connectutils.py
@@ -25,7 +25,6 @@ from pyspark import Row, SparkConf
 from pyspark.testing.utils import PySparkErrorTestUtils
 from pyspark.testing.sqlutils import (
     have_pandas,
-    have_pyarrow,
     pandas_requirement_message,
     pyarrow_requirement_message,
     SQLTestUtils,
@@ -55,27 +54,11 @@ except ImportError as e:
     googleapis_common_protos_requirement_message = str(e)
 have_googleapis_common_protos = googleapis_common_protos_requirement_message 
is None
 
-if (
-    have_pandas
-    and have_pyarrow
-    and have_grpc
-    and have_grpc_status
-    and have_googleapis_common_protos
-):
-    connect_not_compiled_message = None
-else:
-    connect_not_compiled_message = (
-        "Skipping all Spark Connect Python tests as the optional Spark Connect 
project was "
-        "not compiled into a JAR. To run these tests, you need to build Spark 
with "
-        "'build/sbt package' or 'build/mvn package' before running this test."
-    )
-
 
 connect_requirement_message = (
     pandas_requirement_message
     or pyarrow_requirement_message
     or grpc_requirement_message
-    or connect_not_compiled_message
     or googleapis_common_protos_requirement_message
     or grpc_status_requirement_message
 )


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to