Repository: spark Updated Branches: refs/heads/master a1413b366 -> 232d7f8d4
[SPARK-11114][PYSPARK] add getOrCreate for SparkContext/SQLContext in Python Also added SQLContext.newSession() Author: Davies Liu <[email protected]> Closes #9122 from davies/py_create. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/232d7f8d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/232d7f8d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/232d7f8d Branch: refs/heads/master Commit: 232d7f8d42950431f1d9be2a6bb3591fb6ea20d6 Parents: a1413b3 Author: Davies Liu <[email protected]> Authored: Mon Oct 19 16:18:20 2015 -0700 Committer: Andrew Or <[email protected]> Committed: Mon Oct 19 16:18:20 2015 -0700 ---------------------------------------------------------------------- python/pyspark/context.py | 16 ++++++++++++++-- python/pyspark/sql/context.py | 27 +++++++++++++++++++++++++++ python/pyspark/sql/tests.py | 14 ++++++++++++++ python/pyspark/tests.py | 4 ++++ 4 files changed, 59 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/232d7f8d/python/pyspark/context.py ---------------------------------------------------------------------- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 4969d85..afd74d9 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -21,7 +21,7 @@ import os import shutil import signal import sys -from threading import Lock +from threading import RLock from tempfile import NamedTemporaryFile from pyspark import accumulators @@ -65,7 +65,7 @@ class SparkContext(object): _jvm = None _next_accum_id = 0 _active_spark_context = None - _lock = Lock() + _lock = RLock() _python_includes = None # zip and egg files that need to be added to PYTHONPATH PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar') @@ -280,6 +280,18 @@ class SparkContext(object): """ self.stop() + @classmethod + def getOrCreate(cls, conf=None): + """ + Get or instantiate a SparkContext and register it as a singleton object. + + :param conf: SparkConf (optional) + """ + with SparkContext._lock: + if SparkContext._active_spark_context is None: + SparkContext(conf=conf or SparkConf()) + return SparkContext._active_spark_context + def setLogLevel(self, logLevel): """ Control our logLevel. This overrides any user-defined log settings. http://git-wip-us.apache.org/repos/asf/spark/blob/232d7f8d/python/pyspark/sql/context.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 89c8c6e..7945365 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -75,6 +75,8 @@ class SQLContext(object): SQLContext in the JVM, instead we make all calls to this object. """ + _instantiatedContext = None + @ignore_unicode_prefix def __init__(self, sparkContext, sqlContext=None): """Creates a new SQLContext. @@ -99,6 +101,8 @@ class SQLContext(object): self._scala_SQLContext = sqlContext _monkey_patch_RDD(self) install_exception_handler() + if SQLContext._instantiatedContext is None: + SQLContext._instantiatedContext = self @property def _ssql_ctx(self): @@ -111,6 +115,29 @@ class SQLContext(object): self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc()) return self._scala_SQLContext + @classmethod + @since(1.6) + def getOrCreate(cls, sc): + """ + Get the existing SQLContext or create a new one with given SparkContext. + + :param sc: SparkContext + """ + if cls._instantiatedContext is None: + jsqlContext = sc._jvm.SQLContext.getOrCreate(sc._jsc.sc()) + cls(sc, jsqlContext) + return cls._instantiatedContext + + @since(1.6) + def newSession(self): + """ + Returns a new SQLContext as new session, that has separate SQLConf, + registered temporary tables and UDFs, but shared SparkContext and + table cache. + """ + jsqlContext = self._ssql_ctx.newSession() + return self.__class__(self._sc, jsqlContext) + @since(1.3) def setConf(self, key, value): """Sets the given Spark SQL configuration property. http://git-wip-us.apache.org/repos/asf/spark/blob/232d7f8d/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 645133b..f465e1f 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -174,6 +174,20 @@ class DataTypeTests(unittest.TestCase): self.assertEqual(dt.fromInternal(0), datetime.date(1970, 1, 1)) +class SQLContextTests(ReusedPySparkTestCase): + def test_get_or_create(self): + sqlCtx = SQLContext.getOrCreate(self.sc) + self.assertTrue(SQLContext.getOrCreate(self.sc) is sqlCtx) + + def test_new_session(self): + sqlCtx = SQLContext.getOrCreate(self.sc) + sqlCtx.setConf("test_key", "a") + sqlCtx2 = sqlCtx.newSession() + sqlCtx2.setConf("test_key", "b") + self.assertEqual(sqlCtx.getConf("test_key", ""), "a") + self.assertEqual(sqlCtx2.getConf("test_key", ""), "b") + + class SQLTests(ReusedPySparkTestCase): @classmethod http://git-wip-us.apache.org/repos/asf/spark/blob/232d7f8d/python/pyspark/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 63cc87e..3c51809 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1883,6 +1883,10 @@ class ContextTests(unittest.TestCase): # Regression test for SPARK-1550 self.assertRaises(Exception, lambda: SparkContext("an-invalid-master-name")) + def test_get_or_create(self): + with SparkContext.getOrCreate() as sc: + self.assertTrue(SparkContext.getOrCreate() is sc) + def test_stop(self): sc = SparkContext() self.assertNotEqual(SparkContext._active_spark_context, None) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
