Repository: spark
Updated Branches:
  refs/heads/branch-1.5 e9ae1fda9 -> 27b5f31a0


[SPARK-11836][SQL] use existing SQLContext for udf/cast (1.5 branch)

udf/cast should use existing SQLContext.

Author: Davies Liu <[email protected]>

Closes #9915 from davies/create_1.5.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27b5f31a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27b5f31a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27b5f31a

Branch: refs/heads/branch-1.5
Commit: 27b5f31a08737969b69a43536acc04cb882a7bdc
Parents: e9ae1fd
Author: Davies Liu <[email protected]>
Authored: Mon Nov 23 13:43:48 2015 -0800
Committer: Reynold Xin <[email protected]>
Committed: Mon Nov 23 13:43:48 2015 -0800

----------------------------------------------------------------------
 python/pyspark/sql/column.py    | 6 +++---
 python/pyspark/sql/context.py   | 3 +++
 python/pyspark/sql/functions.py | 5 +++--
 3 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/27b5f31a/python/pyspark/sql/column.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py
index 8af8637..29e9fba 100644
--- a/python/pyspark/sql/column.py
+++ b/python/pyspark/sql/column.py
@@ -318,9 +318,9 @@ class Column(object):
         if isinstance(dataType, basestring):
             jc = self._jc.cast(dataType)
         elif isinstance(dataType, DataType):
-            sc = SparkContext._active_spark_context
-            ssql_ctx = sc._jvm.SQLContext(sc._jsc.sc())
-            jdt = ssql_ctx.parseDataType(dataType.json())
+            from pyspark.sql import SQLContext
+            ctx = SQLContext._instantiatedContext
+            jdt = ctx._ssql_ctx.parseDataType(dataType.json())
             jc = self._jc.cast(jdt)
         else:
             raise TypeError("unexpected type: %s" % type(dataType))

http://git-wip-us.apache.org/repos/asf/spark/blob/27b5f31a/python/pyspark/sql/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 0ef46c4..b42f8c4 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -75,6 +75,8 @@ class SQLContext(object):
         SQLContext in the JVM, instead we make all calls to this object.
     """
 
+    _instantiatedContext = None
+
     @ignore_unicode_prefix
     def __init__(self, sparkContext, sqlContext=None):
         """Creates a new SQLContext.
@@ -99,6 +101,7 @@ class SQLContext(object):
         self._scala_SQLContext = sqlContext
         _monkey_patch_RDD(self)
         install_exception_handler()
+        SQLContext._instantiatedContext = self
 
     @property
     def _ssql_ctx(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/27b5f31a/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 3c631a0..fa3bd3f 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1422,14 +1422,15 @@ class UserDefinedFunction(object):
         self._judf = self._create_judf(name)
 
     def _create_judf(self, name):
+        from pyspark.sql import SQLContext
         f, returnType = self.func, self.returnType  # put them in closure 
`func`
         func = lambda _, it: map(lambda x: returnType.toInternal(f(*x)), it)
         ser = AutoBatchedSerializer(PickleSerializer())
         command = (func, None, ser, ser)
         sc = SparkContext._active_spark_context
         pickled_command, broadcast_vars, env, includes = 
_prepare_for_python_RDD(sc, command, self)
-        ssql_ctx = sc._jvm.SQLContext(sc._jsc.sc())
-        jdt = ssql_ctx.parseDataType(self.returnType.json())
+        ctx = SQLContext._instantiatedContext
+        jdt = ctx._ssql_ctx.parseDataType(self.returnType.json())
         if name is None:
             name = f.__name__ if hasattr(f, '__name__') else 
f.__class__.__name__
         judf = sc._jvm.UserDefinedPythonFunction(name, 
bytearray(pickled_command), env, includes,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to