This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d99edac [SPARK-36884][PYTHON] Inline type hints for
pyspark.sql.session
d99edac is described below
commit d99edacb7a5b66581f98b26be6b1a775b794594a
Author: Takuya UESHIN <[email protected]>
AuthorDate: Thu Oct 7 11:22:51 2021 +0900
[SPARK-36884][PYTHON] Inline type hints for pyspark.sql.session
### What changes were proposed in this pull request?
Inline type hints from `python/pyspark/sql/session.pyi` to
`python/pyspark/sql/session.py`.
### Why are the changes needed?
Currently, there is type hint stub files `python/pyspark/sql/session.pyi`
to show the expected types for functions, but we can also take advantage of
static type checking within the functions by inlining the type hints.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing test.
Closes #34136 from ueshin/issues/SPARK-36884/inline_typehints.
Authored-by: Takuya UESHIN <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/session.py | 272 ++++++++++++++++++++++++++++++-----------
python/pyspark/sql/session.pyi | 131 --------------------
2 files changed, 204 insertions(+), 199 deletions(-)
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 4a3c6b3..60e2d69 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -19,23 +19,42 @@ import sys
import warnings
from functools import reduce
from threading import RLock
+from types import TracebackType
+from typing import (
+ Any, Dict, Iterable, List, Optional, Tuple, Type, Union,
+ cast, no_type_check, overload, TYPE_CHECKING
+)
-from pyspark import since
+from py4j.java_gateway import JavaObject # type: ignore[import]
+
+from pyspark import SparkConf, SparkContext, since
from pyspark.rdd import RDD
from pyspark.sql.conf import RuntimeConfig
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.pandas.conversion import SparkConversionMixin
from pyspark.sql.readwriter import DataFrameReader
from pyspark.sql.streaming import DataStreamReader
-from pyspark.sql.types import DataType, StructType, \
- _make_type_verifier, _infer_schema, _has_nulltype, _merge_type,
_create_converter, \
+from pyspark.sql.types import ( # type: ignore[attr-defined]
+ AtomicType, DataType, StructType,
+ _make_type_verifier, _infer_schema, _has_nulltype, _merge_type,
_create_converter,
_parse_datatype_string
+)
from pyspark.sql.utils import install_exception_handler,
is_timestamp_ntz_preferred
+if TYPE_CHECKING:
+ from pyspark.sql._typing import DateTimeLiteral, LiteralType,
DecimalLiteral, RowLike
+ from pyspark.sql.catalog import Catalog
+ from pyspark.sql.pandas._typing import DataFrameLike as PandasDataFrameLike
+ from pyspark.sql.streaming import StreamingQueryManager
+ from pyspark.sql.udf import UDFRegistration
+
+
__all__ = ["SparkSession"]
-def _monkey_patch_RDD(sparkSession):
+def _monkey_patch_RDD(sparkSession: "SparkSession") -> None:
+
+ @no_type_check
def toDF(self, schema=None, sampleRatio=None):
"""
Converts current :class:`RDD` into a :class:`DataFrame`
@@ -65,7 +84,7 @@ def _monkey_patch_RDD(sparkSession):
"""
return sparkSession.createDataFrame(self, schema, sampleRatio)
- RDD.toDF = toDF
+ RDD.toDF = toDF # type: ignore[assignment]
class SparkSession(SparkConversionMixin):
@@ -107,10 +126,23 @@ class SparkSession(SparkConversionMixin):
"""
_lock = RLock()
- _options = {}
+ _options: Dict[str, Any] = {}
_sc = None
- def config(self, key=None, value=None, conf=None):
+ @overload
+ def config(self, *, conf: SparkConf) -> "SparkSession.Builder":
+ ...
+
+ @overload
+ def config(self, key: str, value: Any) -> "SparkSession.Builder":
+ ...
+
+ def config(
+ self,
+ key: Optional[str] = None,
+ value: Optional[Any] = None,
+ conf: Optional[SparkConf] = None
+ ) -> "SparkSession.Builder":
"""Sets a config option. Options set using this method are
automatically propagated to
both :class:`SparkConf` and :class:`SparkSession`'s own
configuration.
@@ -141,13 +173,13 @@ class SparkSession(SparkConversionMixin):
"""
with self._lock:
if conf is None:
- self._options[key] = str(value)
+ self._options[cast(str, key)] = str(value)
else:
for (k, v) in conf.getAll():
self._options[k] = v
return self
- def master(self, master):
+ def master(self, master: str) -> "SparkSession.Builder":
"""Sets the Spark master URL to connect to, such as "local" to run
locally, "local[4]"
to run locally with 4 cores, or "spark://master:7077" to run on a
Spark standalone
cluster.
@@ -161,7 +193,7 @@ class SparkSession(SparkConversionMixin):
"""
return self.config("spark.master", master)
- def appName(self, name):
+ def appName(self, name: str) -> "SparkSession.Builder":
"""Sets a name for the application, which will be shown in the
Spark web UI.
If no application name is set, a randomly generated name will be
used.
@@ -176,18 +208,18 @@ class SparkSession(SparkConversionMixin):
return self.config("spark.app.name", name)
@since(2.0)
- def enableHiveSupport(self):
+ def enableHiveSupport(self) -> "SparkSession.Builder":
"""Enables Hive support, including connectivity to a persistent
Hive metastore, support
for Hive SerDes, and Hive user-defined functions.
"""
return self.config("spark.sql.catalogImplementation", "hive")
- def _sparkContext(self, sc):
+ def _sparkContext(self, sc: SparkContext) -> "SparkSession.Builder":
with self._lock:
self._sc = sc
return self
- def getOrCreate(self):
+ def getOrCreate(self) -> "SparkSession":
"""Gets an existing :class:`SparkSession` or, if there is no
existing one, creates a
new one based on the options set in this builder.
@@ -217,7 +249,7 @@ class SparkSession(SparkConversionMixin):
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
session = SparkSession._instantiatedSession
- if session is None or session._sc._jsc is None:
+ if session is None or session._sc._jsc is None: # type:
ignore[attr-defined]
if self._sc is not None:
sc = self._sc
else:
@@ -239,11 +271,11 @@ class SparkSession(SparkConversionMixin):
_instantiatedSession = None
_activeSession = None
- def __init__(self, sparkContext, jsparkSession=None):
+ def __init__(self, sparkContext: SparkContext, jsparkSession:
Optional[JavaObject] = None):
from pyspark.sql.context import SQLContext
self._sc = sparkContext
- self._jsc = self._sc._jsc
- self._jvm = self._sc._jvm
+ self._jsc = self._sc._jsc # type: ignore[attr-defined]
+ self._jvm = self._sc._jvm # type: ignore[attr-defined]
if jsparkSession is None:
if self._jvm.SparkSession.getDefaultSession().isDefined() \
and not self._jvm.SparkSession.getDefaultSession().get() \
@@ -266,7 +298,7 @@ class SparkSession(SparkConversionMixin):
self._jvm.SparkSession.setDefaultSession(self._jsparkSession)
self._jvm.SparkSession.setActiveSession(self._jsparkSession)
- def _repr_html_(self):
+ def _repr_html_(self) -> str:
return """
<div>
<p><b>SparkSession - {catalogImplementation}</b></p>
@@ -274,11 +306,11 @@ class SparkSession(SparkConversionMixin):
</div>
""".format(
catalogImplementation=self.conf.get("spark.sql.catalogImplementation"),
- sc_HTML=self.sparkContext._repr_html_()
+ sc_HTML=self.sparkContext._repr_html_() # type:
ignore[attr-defined]
)
@since(2.0)
- def newSession(self):
+ def newSession(self) -> "SparkSession":
"""
Returns a new :class:`SparkSession` as new session, that has separate
SQLConf,
registered temporary views and UDFs, but shared :class:`SparkContext`
and
@@ -287,7 +319,7 @@ class SparkSession(SparkConversionMixin):
return self.__class__(self._sc, self._jsparkSession.newSession())
@classmethod
- def getActiveSession(cls):
+ def getActiveSession(cls) -> Optional["SparkSession"]:
"""
Returns the active :class:`SparkSession` for the current thread,
returned by the builder
@@ -308,7 +340,7 @@ class SparkSession(SparkConversionMixin):
[Row(age=1)]
"""
from pyspark import SparkContext
- sc = SparkContext._active_spark_context
+ sc = SparkContext._active_spark_context # type: ignore[attr-defined]
if sc is None:
return None
else:
@@ -318,21 +350,21 @@ class SparkSession(SparkConversionMixin):
else:
return None
- @property
+ @property # type: ignore[misc]
@since(2.0)
- def sparkContext(self):
+ def sparkContext(self) -> SparkContext:
"""Returns the underlying :class:`SparkContext`."""
return self._sc
- @property
+ @property # type: ignore[misc]
@since(2.0)
- def version(self):
+ def version(self) -> str:
"""The version of Spark on which this application is running."""
return self._jsparkSession.version()
- @property
+ @property # type: ignore[misc]
@since(2.0)
- def conf(self):
+ def conf(self) -> RuntimeConfig:
"""Runtime configuration interface for Spark.
This is the interface through which the user can get and set all Spark
and Hadoop
@@ -348,7 +380,7 @@ class SparkSession(SparkConversionMixin):
return self._conf
@property
- def catalog(self):
+ def catalog(self) -> "Catalog":
"""Interface through which the user may create, drop, alter or query
underlying
databases, tables, functions, etc.
@@ -364,7 +396,7 @@ class SparkSession(SparkConversionMixin):
return self._catalog
@property
- def udf(self):
+ def udf(self) -> "UDFRegistration":
"""Returns a :class:`UDFRegistration` for UDF registration.
.. versionadded:: 2.0.0
@@ -376,7 +408,13 @@ class SparkSession(SparkConversionMixin):
from pyspark.sql.udf import UDFRegistration
return UDFRegistration(self)
- def range(self, start, end=None, step=1, numPartitions=None):
+ def range(
+ self,
+ start: int,
+ end: Optional[int] = None,
+ step: int = 1,
+ numPartitions: Optional[int] = None
+ ) -> DataFrame:
"""
Create a :class:`DataFrame` with single
:class:`pyspark.sql.types.LongType` column named
``id``, containing elements in a range from ``start`` to ``end``
(exclusive) with
@@ -419,7 +457,9 @@ class SparkSession(SparkConversionMixin):
return DataFrame(jdf, self._wrapped)
- def _inferSchemaFromList(self, data, names=None):
+ def _inferSchemaFromList(
+ self, data: Iterable[Any], names: Optional[List[str]] = None
+ ) -> StructType:
"""
Infer schema from list of Row, dict, or tuple.
@@ -436,7 +476,7 @@ class SparkSession(SparkConversionMixin):
"""
if not data:
raise ValueError("can not infer schema from empty dataset")
- infer_dict_as_struct = self._wrapped._conf.inferDictAsStruct()
+ infer_dict_as_struct = self._wrapped._conf.inferDictAsStruct() #
type: ignore[attr-defined]
prefer_timestamp_ntz = is_timestamp_ntz_preferred()
schema = reduce(_merge_type, (
_infer_schema(row, names, infer_dict_as_struct,
prefer_timestamp_ntz)
@@ -445,7 +485,12 @@ class SparkSession(SparkConversionMixin):
raise ValueError("Some of types cannot be determined after
inferring")
return schema
- def _inferSchema(self, rdd, samplingRatio=None, names=None):
+ def _inferSchema(
+ self,
+ rdd: "RDD[Any]",
+ samplingRatio: Optional[float] = None,
+ names: Optional[List[str]] = None
+ ) -> StructType:
"""
Infer schema from an RDD of Row, dict, or tuple.
@@ -466,7 +511,7 @@ class SparkSession(SparkConversionMixin):
raise ValueError("The first row in RDD is empty, "
"can not infer schema")
- infer_dict_as_struct = self._wrapped._conf.inferDictAsStruct()
+ infer_dict_as_struct = self._wrapped._conf.inferDictAsStruct() #
type: ignore[attr-defined]
prefer_timestamp_ntz = is_timestamp_ntz_preferred()
if samplingRatio is None:
schema = _infer_schema(
@@ -492,28 +537,38 @@ class SparkSession(SparkConversionMixin):
prefer_timestamp_ntz=prefer_timestamp_ntz)).reduce(_merge_type)
return schema
- def _createFromRDD(self, rdd, schema, samplingRatio):
+ def _createFromRDD(
+ self,
+ rdd: "RDD[Any]",
+ schema: Optional[Union[DataType, List[str]]],
+ samplingRatio: Optional[float]
+ ) -> Tuple["RDD[Tuple]", StructType]:
"""
Create an RDD for DataFrame from an existing RDD, returns the RDD and
schema.
"""
if schema is None or isinstance(schema, (list, tuple)):
struct = self._inferSchema(rdd, samplingRatio, names=schema)
converter = _create_converter(struct)
- rdd = rdd.map(converter)
+ tupled_rdd = rdd.map(converter)
if isinstance(schema, (list, tuple)):
for i, name in enumerate(schema):
struct.fields[i].name = name
struct.names[i] = name
- schema = struct
- elif not isinstance(schema, StructType):
+ elif isinstance(schema, StructType):
+ struct = schema
+ tupled_rdd = rdd
+
+ else:
raise TypeError("schema should be StructType or list or None, but
got: %s" % schema)
# convert python objects to sql data
- rdd = rdd.map(schema.toInternal)
- return rdd, schema
+ internal_rdd = tupled_rdd.map(struct.toInternal)
+ return internal_rdd, struct
- def _createFromLocal(self, data, schema):
+ def _createFromLocal(
+ self, data: Iterable[Any], schema: Optional[Union[DataType, List[str]]]
+ ) -> Tuple["RDD[Tuple]", StructType]:
"""
Create an RDD for DataFrame from a list or pandas.DataFrame, returns
the RDD and schema.
@@ -525,22 +580,25 @@ class SparkSession(SparkConversionMixin):
if schema is None or isinstance(schema, (list, tuple)):
struct = self._inferSchemaFromList(data, names=schema)
converter = _create_converter(struct)
- data = map(converter, data)
+ tupled_data: Iterable[Tuple] = map(converter, data)
if isinstance(schema, (list, tuple)):
for i, name in enumerate(schema):
struct.fields[i].name = name
struct.names[i] = name
- schema = struct
- elif not isinstance(schema, StructType):
+ elif isinstance(schema, StructType):
+ struct = schema
+ tupled_data = data
+
+ else:
raise TypeError("schema should be StructType or list or None, but
got: %s" % schema)
# convert python objects to sql data
- data = [schema.toInternal(row) for row in data]
- return self._sc.parallelize(data), schema
+ internal_data = [struct.toInternal(row) for row in tupled_data]
+ return self._sc.parallelize(internal_data), struct
@staticmethod
- def _create_shell_session():
+ def _create_shell_session() -> "SparkSession":
"""
Initialize a :class:`SparkSession` for a pyspark shell session. This
is called from
shell.py to make error handling simpler without needing to declare
local variables in
@@ -553,7 +611,8 @@ class SparkSession(SparkConversionMixin):
# Try to access HiveConf, it will raise exception if Hive is not
added
conf = SparkConf()
if conf.get('spark.sql.catalogImplementation', 'hive').lower() ==
'hive':
- SparkContext._jvm.org.apache.hadoop.hive.conf.HiveConf()
+ (SparkContext._jvm # type: ignore[attr-defined]
+ .org.apache.hadoop.hive.conf.HiveConf())
return SparkSession.builder\
.enableHiveSupport()\
.getOrCreate()
@@ -566,7 +625,66 @@ class SparkSession(SparkConversionMixin):
return SparkSession.builder.getOrCreate()
- def createDataFrame(self, data, schema=None, samplingRatio=None,
verifySchema=True):
+ @overload
+ def createDataFrame(
+ self,
+ data: Union["RDD[RowLike]", Iterable["RowLike"]],
+ samplingRatio: Optional[float] = ...,
+ ) -> DataFrame:
+ ...
+
+ @overload
+ def createDataFrame(
+ self,
+ data: Union["RDD[RowLike]", Iterable["RowLike"]],
+ schema: Union[List[str], Tuple[str, ...]] = ...,
+ verifySchema: bool = ...,
+ ) -> DataFrame:
+ ...
+
+ @overload
+ def createDataFrame(
+ self,
+ data: Union[
+ "RDD[Union[DateTimeLiteral, LiteralType, DecimalLiteral]]",
+ Iterable[Union["DateTimeLiteral", "LiteralType",
"DecimalLiteral"]],
+ ],
+ schema: Union[AtomicType, str],
+ verifySchema: bool = ...,
+ ) -> DataFrame:
+ ...
+
+ @overload
+ def createDataFrame(
+ self,
+ data: Union["RDD[RowLike]", Iterable["RowLike"]],
+ schema: Union[StructType, str],
+ verifySchema: bool = ...,
+ ) -> DataFrame:
+ ...
+
+ @overload
+ def createDataFrame(
+ self, data: "PandasDataFrameLike", samplingRatio: Optional[float] = ...
+ ) -> DataFrame:
+ ...
+
+ @overload
+ def createDataFrame(
+ self,
+ data: "PandasDataFrameLike",
+ schema: Union[StructType, str],
+ verifySchema: bool = ...,
+ ) -> DataFrame:
+ ...
+
+ def createDataFrame( # type: ignore[misc]
+ self,
+ data: Union["RDD[Any]", Iterable[Any], "PandasDataFrameLike"],
+ schema: Optional[Union[AtomicType, StructType, str]] = None,
+ samplingRatio: Optional[float] = None,
+ verifySchema: bool = True
+ ) -> DataFrame:
"""
Creates a :class:`DataFrame` from an :class:`RDD`, a list or a
:class:`pandas.DataFrame`.
@@ -684,14 +802,23 @@ class SparkSession(SparkConversionMixin):
has_pandas = False
if has_pandas and isinstance(data, pandas.DataFrame):
# Create a DataFrame from pandas DataFrame.
- return super(SparkSession, self).createDataFrame(
+ return super(SparkSession, self).createDataFrame( # type:
ignore[call-overload]
data, schema, samplingRatio, verifySchema)
- return self._create_dataframe(data, schema, samplingRatio,
verifySchema)
+ return self._create_dataframe(
+ data, schema, samplingRatio, verifySchema # type: ignore[arg-type]
+ )
- def _create_dataframe(self, data, schema, samplingRatio, verifySchema):
+ def _create_dataframe(
+ self,
+ data: Union["RDD[Any]", Iterable[Any]],
+ schema: Optional[Union[DataType, List[str]]],
+ samplingRatio: Optional[float],
+ verifySchema: bool,
+ ) -> DataFrame:
if isinstance(schema, StructType):
verify_func = _make_type_verifier(schema) if verifySchema else
lambda _: True
+ @no_type_check
def prepare(obj):
verify_func(obj)
return obj
@@ -702,6 +829,7 @@ class SparkSession(SparkConversionMixin):
verify_func = _make_type_verifier(
dataType, name="field value") if verifySchema else lambda _:
True
+ @no_type_check
def prepare(obj):
verify_func(obj)
return obj,
@@ -709,16 +837,18 @@ class SparkSession(SparkConversionMixin):
prepare = lambda obj: obj
if isinstance(data, RDD):
- rdd, schema = self._createFromRDD(data.map(prepare), schema,
samplingRatio)
+ rdd, struct = self._createFromRDD(data.map(prepare), schema,
samplingRatio)
else:
- rdd, schema = self._createFromLocal(map(prepare, data), schema)
- jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
- jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(),
schema.json())
+ rdd, struct = self._createFromLocal(map(prepare, data), schema)
+ jrdd = self._jvm.SerDeUtil.toJavaArray(
+ rdd._to_java_object_rdd() # type: ignore[attr-defined]
+ )
+ jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(),
struct.json())
df = DataFrame(jdf, self._wrapped)
- df._schema = schema
+ df._schema = struct # type: ignore[attr-defined]
return df
- def sql(self, sqlQuery):
+ def sql(self, sqlQuery: str) -> DataFrame:
"""Returns a :class:`DataFrame` representing the result of the given
query.
.. versionadded:: 2.0.0
@@ -736,7 +866,7 @@ class SparkSession(SparkConversionMixin):
"""
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
- def table(self, tableName):
+ def table(self, tableName: str) -> DataFrame:
"""Returns the specified table as a :class:`DataFrame`.
.. versionadded:: 2.0.0
@@ -755,7 +885,7 @@ class SparkSession(SparkConversionMixin):
return DataFrame(self._jsparkSession.table(tableName), self._wrapped)
@property
- def read(self):
+ def read(self) -> DataFrameReader:
"""
Returns a :class:`DataFrameReader` that can be used to read data
in as a :class:`DataFrame`.
@@ -769,7 +899,7 @@ class SparkSession(SparkConversionMixin):
return DataFrameReader(self._wrapped)
@property
- def readStream(self):
+ def readStream(self) -> DataStreamReader:
"""
Returns a :class:`DataStreamReader` that can be used to read data
streams
as a streaming :class:`DataFrame`.
@@ -787,7 +917,7 @@ class SparkSession(SparkConversionMixin):
return DataStreamReader(self._wrapped)
@property
- def streams(self):
+ def streams(self) -> "StreamingQueryManager":
"""Returns a :class:`StreamingQueryManager` that allows managing all
the
:class:`StreamingQuery` instances active on `this` context.
@@ -805,7 +935,7 @@ class SparkSession(SparkConversionMixin):
return StreamingQueryManager(self._jsparkSession.streams())
@since(2.0)
- def stop(self):
+ def stop(self) -> None:
"""Stop the underlying :class:`SparkContext`.
"""
from pyspark.sql.context import SQLContext
@@ -815,17 +945,22 @@ class SparkSession(SparkConversionMixin):
self._jvm.SparkSession.clearActiveSession()
SparkSession._instantiatedSession = None
SparkSession._activeSession = None
- SQLContext._instantiatedContext = None
+ SQLContext._instantiatedContext = None # type: ignore[attr-defined]
@since(2.0)
- def __enter__(self):
+ def __enter__(self) -> "SparkSession":
"""
Enable 'with SparkSession.builder.(...).getOrCreate() as session: app'
syntax.
"""
return self
@since(2.0)
- def __exit__(self, exc_type, exc_val, exc_tb):
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType]
+ ) -> None:
"""
Enable 'with SparkSession.builder.(...).getOrCreate() as session: app'
syntax.
@@ -834,7 +969,7 @@ class SparkSession(SparkConversionMixin):
self.stop()
-def _test():
+def _test() -> None:
import os
import doctest
from pyspark.context import SparkContext
@@ -859,5 +994,6 @@ def _test():
if failure_count:
sys.exit(-1)
+
if __name__ == "__main__":
_test()
diff --git a/python/pyspark/sql/session.pyi b/python/pyspark/sql/session.pyi
deleted file mode 100644
index 6cd2d3b..0000000
--- a/python/pyspark/sql/session.pyi
+++ /dev/null
@@ -1,131 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from typing import overload
-from typing import Any, Iterable, List, Optional, Tuple, Type, TypeVar, Union
-from types import TracebackType
-
-from py4j.java_gateway import JavaObject # type: ignore[import]
-
-from pyspark.sql._typing import DateTimeLiteral, LiteralType, DecimalLiteral,
RowLike
-from pyspark.sql.pandas._typing import DataFrameLike
-from pyspark.conf import SparkConf
-from pyspark.context import SparkContext
-from pyspark.rdd import RDD
-from pyspark.sql.catalog import Catalog
-from pyspark.sql.conf import RuntimeConfig
-from pyspark.sql.dataframe import DataFrame
-from pyspark.sql.pandas.conversion import SparkConversionMixin
-from pyspark.sql.types import AtomicType, StructType
-from pyspark.sql.readwriter import DataFrameReader
-from pyspark.sql.streaming import DataStreamReader, StreamingQueryManager
-from pyspark.sql.udf import UDFRegistration
-
-T = TypeVar("T")
-
-class SparkSession(SparkConversionMixin):
- class Builder:
- @overload
- def config(self, *, conf: SparkConf) -> SparkSession.Builder: ...
- @overload
- def config(self, key: str, value: Any) -> SparkSession.Builder: ...
- def master(self, master: str) -> SparkSession.Builder: ...
- def appName(self, name: str) -> SparkSession.Builder: ...
- def enableHiveSupport(self) -> SparkSession.Builder: ...
- def getOrCreate(self) -> SparkSession: ...
- builder: SparkSession.Builder
- def __init__(
- self, sparkContext: SparkContext, jsparkSession: Optional[JavaObject]
= ...
- ) -> None: ...
- def newSession(self) -> SparkSession: ...
- @classmethod
- def getActiveSession(cls) -> SparkSession: ...
- @property
- def sparkContext(self) -> SparkContext: ...
- @property
- def version(self) -> str: ...
- @property
- def conf(self) -> RuntimeConfig: ...
- @property
- def catalog(self) -> Catalog: ...
- @property
- def udf(self) -> UDFRegistration: ...
- def range(
- self,
- start: int,
- end: Optional[int] = ...,
- step: int = ...,
- numPartitions: Optional[int] = ...,
- ) -> DataFrame: ...
- @overload
- def createDataFrame(
- self,
- data: Union[RDD[RowLike], Iterable[RowLike]],
- samplingRatio: Optional[float] = ...,
- ) -> DataFrame: ...
- @overload
- def createDataFrame(
- self,
- data: Union[RDD[RowLike], Iterable[RowLike]],
- schema: Union[List[str], Tuple[str, ...]] = ...,
- verifySchema: bool = ...,
- ) -> DataFrame: ...
- @overload
- def createDataFrame(
- self,
- data: Union[
- RDD[Union[DateTimeLiteral, LiteralType, DecimalLiteral]],
- Iterable[Union[DateTimeLiteral, LiteralType, DecimalLiteral]],
- ],
- schema: Union[AtomicType, str],
- verifySchema: bool = ...,
- ) -> DataFrame: ...
- @overload
- def createDataFrame(
- self,
- data: Union[RDD[RowLike], Iterable[RowLike]],
- schema: Union[StructType, str],
- verifySchema: bool = ...,
- ) -> DataFrame: ...
- @overload
- def createDataFrame(
- self, data: DataFrameLike, samplingRatio: Optional[float] = ...
- ) -> DataFrame: ...
- @overload
- def createDataFrame(
- self,
- data: DataFrameLike,
- schema: Union[StructType, str],
- verifySchema: bool = ...,
- ) -> DataFrame: ...
- def sql(self, sqlQuery: str) -> DataFrame: ...
- def table(self, tableName: str) -> DataFrame: ...
- @property
- def read(self) -> DataFrameReader: ...
- @property
- def readStream(self) -> DataStreamReader: ...
- @property
- def streams(self) -> StreamingQueryManager: ...
- def stop(self) -> None: ...
- def __enter__(self) -> SparkSession: ...
- def __exit__(
- self,
- exc_type: Optional[Type[BaseException]],
- exc_val: Optional[BaseException],
- exc_tb: Optional[TracebackType],
- ) -> None: ...
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]