This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7f6d554a4933 [SPARK-49436][CONNECT][SQL] Common interface for
SQLContext
7f6d554a4933 is described below
commit 7f6d554a493330744113fa7934236d0dc5a90bc0
Author: Paddy Xu <[email protected]>
AuthorDate: Thu Dec 19 09:03:20 2024 +0900
[SPARK-49436][CONNECT][SQL] Common interface for SQLContext
### What changes were proposed in this pull request?
This PR adds an abstraction for `SQLContext` in the `spark-api` package.
Both sides (Classic and Connect) maintain their own implementation.
### Why are the changes needed?
To unify the API interface and make `SQLContext` available to Spark Connect.
### Does this PR introduce _any_ user-facing change?
Yes. Connect users are now able to call `sparkSession.sqlContext` and the
APIs it provides.
### How was this patch tested?
Not needed. All new methods are mirrored from SparkSession except
`tables()`, which is covered by existing tests in `ListTablesSuite`.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48958 from xupefei/api-sqlcontext.
Authored-by: Paddy Xu <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 5 -
.../scala/org/apache/spark/sql/SQLContext.scala | 336 +++++++
.../scala/org/apache/spark/sql/SparkSession.scala | 3 +-
.../connect/ConnectClientUnsupportedErrors.scala | 3 -
.../spark/sql/UnsupportedFeaturesSuite.scala | 4 -
.../CheckConnectJvmClientCompatibility.scala | 7 +-
.../org/apache/spark/sql/api}/SQLContext.scala | 629 +++++-------
.../org/apache/spark/sql/api/SparkSession.scala | 2 +-
.../src/main/scala/org/apache/spark/shims.scala | 1 -
.../scala/org/apache/spark/sql/SQLContext.scala | 1063 ++++----------------
.../org/apache/spark/sql/SQLContextSuite.scala | 23 +
11 files changed, 847 insertions(+), 1229 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 695f89d741c1..deb62866f072 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -5090,11 +5090,6 @@
"message" : [
"Access to the SparkContext."
]
- },
- "SESSION_SQL_CONTEXT" : {
- "message" : [
- "Access to the SparkSession SQL Context."
- ]
}
},
"sqlState" : "0A000"
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SQLContext.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SQLContext.scala
new file mode 100644
index 000000000000..3603eb6ea508
--- /dev/null
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.{List => JList, Map => JMap, Properties}
+
+import scala.jdk.CollectionConverters.PropertiesHasAsScala
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.Stable
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.connect.ConnectClientUnsupportedErrors
+import org.apache.spark.sql.connect.ConnectConversions._
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQueryManager}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.ExecutionListenerManager
+
+@Stable
+class SQLContext private[sql] (override val sparkSession: SparkSession)
+ extends api.SQLContext(sparkSession) {
+
+ /** @inheritdoc */
+ def newSession(): SQLContext = sparkSession.newSession().sqlContext
+
+ /** @inheritdoc */
+ def listenerManager: ExecutionListenerManager = sparkSession.listenerManager
+
+ /** @inheritdoc */
+ def setConf(props: Properties): Unit = sparkSession.conf.synchronized {
+ props.asScala.foreach { case (k, v) => sparkSession.conf.set(k, v) }
+ }
+
+ /** @inheritdoc */
+ def experimental: ExperimentalMethods = sparkSession.experimental
+
+ /** @inheritdoc */
+ def udf: UDFRegistration = sparkSession.udf
+
+ // scalastyle:off
+ // Disable style checker so "implicits" object can start with lowercase i
+
+ /** @inheritdoc */
+ object implicits extends SQLImplicits {
+
+ /** @inheritdoc */
+ override protected def session: SparkSession = sparkSession
+ }
+
+ // scalastyle:on
+
+ /** @inheritdoc */
+ def read: DataFrameReader = sparkSession.read
+
+ /** @inheritdoc */
+ def readStream: DataStreamReader = sparkSession.readStream
+
+ /**
+ * Returns a `StreamingQueryManager` that allows managing all the
+ * [[org.apache.spark.sql.streaming.StreamingQuery StreamingQueries]] active
on `this` context.
+ *
+ * @since 4.0.0
+ */
+ def streams: StreamingQueryManager = sparkSession.streams
+
+ /** @inheritdoc */
+ override def sparkContext: SparkContext = {
+ throw ConnectClientUnsupportedErrors.sparkContext()
+ }
+
+ /** @inheritdoc */
+ override def emptyDataFrame: Dataset[Row] = super.emptyDataFrame
+
+ /** @inheritdoc */
+ override def createDataFrame[A <: Product: TypeTag](rdd: RDD[A]):
Dataset[Row] =
+ super.createDataFrame(rdd)
+
+ /** @inheritdoc */
+ override def createDataFrame[A <: Product: TypeTag](data: Seq[A]):
Dataset[Row] =
+ super.createDataFrame(data)
+
+ /** @inheritdoc */
+ override def baseRelationToDataFrame(baseRelation: BaseRelation):
Dataset[Row] =
+ super.baseRelationToDataFrame(baseRelation)
+
+ /** @inheritdoc */
+ override def createDataFrame(rowRDD: RDD[Row], schema: StructType):
Dataset[Row] =
+ super.createDataFrame(rowRDD, schema)
+
+ /** @inheritdoc */
+ override def createDataset[T: Encoder](data: Seq[T]): Dataset[T] =
super.createDataset(data)
+
+ /** @inheritdoc */
+ override def createDataset[T: Encoder](data: RDD[T]): Dataset[T] =
super.createDataset(data)
+
+ /** @inheritdoc */
+ override def createDataset[T: Encoder](data: JList[T]): Dataset[T] =
+ super.createDataset(data)
+
+ /** @inheritdoc */
+ override def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType):
Dataset[Row] =
+ super.createDataFrame(rowRDD, schema)
+
+ /** @inheritdoc */
+ override def createDataFrame(rows: JList[Row], schema: StructType):
Dataset[Row] =
+ super.createDataFrame(rows, schema)
+
+ /** @inheritdoc */
+ override def createDataFrame(rdd: RDD[_], beanClass: Class[_]): Dataset[Row]
=
+ super.createDataFrame(rdd, beanClass)
+
+ /** @inheritdoc */
+ override def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]):
Dataset[Row] =
+ super.createDataFrame(rdd, beanClass)
+
+ /** @inheritdoc */
+ override def createDataFrame(data: JList[_], beanClass: Class[_]):
Dataset[Row] =
+ super.createDataFrame(data, beanClass)
+
+ /** @inheritdoc */
+ override def createExternalTable(tableName: String, path: String):
Dataset[Row] =
+ super.createExternalTable(tableName, path)
+
+ /** @inheritdoc */
+ override def createExternalTable(
+ tableName: String,
+ path: String,
+ source: String): Dataset[Row] = {
+ super.createExternalTable(tableName, path, source)
+ }
+
+ /** @inheritdoc */
+ override def createExternalTable(
+ tableName: String,
+ source: String,
+ options: JMap[String, String]): Dataset[Row] = {
+ super.createExternalTable(tableName, source, options)
+ }
+
+ /** @inheritdoc */
+ override def createExternalTable(
+ tableName: String,
+ source: String,
+ options: Map[String, String]): Dataset[Row] = {
+ super.createExternalTable(tableName, source, options)
+ }
+
+ /** @inheritdoc */
+ override def createExternalTable(
+ tableName: String,
+ source: String,
+ schema: StructType,
+ options: JMap[String, String]): Dataset[Row] = {
+ super.createExternalTable(tableName, source, schema, options)
+ }
+
+ /** @inheritdoc */
+ override def createExternalTable(
+ tableName: String,
+ source: String,
+ schema: StructType,
+ options: Map[String, String]): Dataset[Row] = {
+ super.createExternalTable(tableName, source, schema, options)
+ }
+
+ /** @inheritdoc */
+ override def range(end: Long): Dataset[Row] = super.range(end)
+
+ /** @inheritdoc */
+ override def range(start: Long, end: Long): Dataset[Row] =
super.range(start, end)
+
+ /** @inheritdoc */
+ override def range(start: Long, end: Long, step: Long): Dataset[Row] =
+ super.range(start, end, step)
+
+ /** @inheritdoc */
+ override def range(start: Long, end: Long, step: Long, numPartitions: Int):
Dataset[Row] =
+ super.range(start, end, step, numPartitions)
+
+ /** @inheritdoc */
+ override def sql(sqlText: String): Dataset[Row] = super.sql(sqlText)
+
+ /** @inheritdoc */
+ override def table(tableName: String): Dataset[Row] = super.table(tableName)
+
+ /** @inheritdoc */
+ override def tables(): Dataset[Row] = super.tables()
+
+ /** @inheritdoc */
+ override def tables(databaseName: String): Dataset[Row] =
super.tables(databaseName)
+
+ /** @inheritdoc */
+ override def applySchema(rowRDD: RDD[Row], schema: StructType): Dataset[Row]
=
+ super.applySchema(rowRDD, schema)
+
+ /** @inheritdoc */
+ override def applySchema(rowRDD: JavaRDD[Row], schema: StructType):
Dataset[Row] =
+ super.applySchema(rowRDD, schema)
+
+ /** @inheritdoc */
+ override def applySchema(rdd: RDD[_], beanClass: Class[_]): Dataset[Row] =
+ super.applySchema(rdd, beanClass)
+
+ /** @inheritdoc */
+ override def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): Dataset[Row]
=
+ super.applySchema(rdd, beanClass)
+
+ /** @inheritdoc */
+ @scala.annotation.varargs
+ override def parquetFile(paths: String*): Dataset[Row] =
super.parquetFile(paths: _*)
+
+ /** @inheritdoc */
+ override def jsonFile(path: String): Dataset[Row] = super.jsonFile(path)
+
+ /** @inheritdoc */
+ override def jsonFile(path: String, schema: StructType): Dataset[Row] =
+ super.jsonFile(path, schema)
+
+ /** @inheritdoc */
+ override def jsonFile(path: String, samplingRatio: Double): Dataset[Row] =
+ super.jsonFile(path, samplingRatio)
+
+ /** @inheritdoc */
+ override def jsonRDD(json: RDD[String]): Dataset[Row] = super.jsonRDD(json)
+
+ /** @inheritdoc */
+ override def jsonRDD(json: JavaRDD[String]): Dataset[Row] =
super.jsonRDD(json)
+
+ /** @inheritdoc */
+ override def jsonRDD(json: RDD[String], schema: StructType): Dataset[Row] =
+ super.jsonRDD(json, schema)
+
+ /** @inheritdoc */
+ override def jsonRDD(json: JavaRDD[String], schema: StructType):
Dataset[Row] =
+ super.jsonRDD(json, schema)
+
+ /** @inheritdoc */
+ override def jsonRDD(json: RDD[String], samplingRatio: Double): Dataset[Row]
=
+ super.jsonRDD(json, samplingRatio)
+
+ /** @inheritdoc */
+ override def jsonRDD(json: JavaRDD[String], samplingRatio: Double):
Dataset[Row] =
+ super.jsonRDD(json, samplingRatio)
+
+ /** @inheritdoc */
+ override def load(path: String): Dataset[Row] = super.load(path)
+
+ /** @inheritdoc */
+ override def load(path: String, source: String): Dataset[Row] =
super.load(path, source)
+
+ /** @inheritdoc */
+ override def load(source: String, options: JMap[String, String]):
Dataset[Row] =
+ super.load(source, options)
+
+ /** @inheritdoc */
+ override def load(source: String, options: Map[String, String]):
Dataset[Row] =
+ super.load(source, options)
+
+ /** @inheritdoc */
+ override def load(
+ source: String,
+ schema: StructType,
+ options: JMap[String, String]): Dataset[Row] = {
+ super.load(source, schema, options)
+ }
+
+ /** @inheritdoc */
+ override def load(
+ source: String,
+ schema: StructType,
+ options: Map[String, String]): Dataset[Row] = {
+ super.load(source, schema, options)
+ }
+
+ /** @inheritdoc */
+ override def jdbc(url: String, table: String): Dataset[Row] =
super.jdbc(url, table)
+
+ /** @inheritdoc */
+ override def jdbc(
+ url: String,
+ table: String,
+ columnName: String,
+ lowerBound: Long,
+ upperBound: Long,
+ numPartitions: Int): Dataset[Row] = {
+ super.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions)
+ }
+
+ /** @inheritdoc */
+ override def jdbc(url: String, table: String, theParts: Array[String]):
Dataset[Row] = {
+ super.jdbc(url, table, theParts)
+ }
+}
+object SQLContext extends api.SQLContextCompanion {
+
+ override private[sql] type SQLContextImpl = SQLContext
+ override private[sql] type SparkContextImpl = SparkContext
+
+ /**
+ * Get the singleton SQLContext if it exists or create a new one.
+ *
+ * This function can be used to create a singleton SQLContext object that
can be shared across
+ * the JVM.
+ *
+ * If there is an active SQLContext for current thread, it will be returned
instead of the
+ * global one.
+ *
+ * @param sparkContext
+ * The SparkContext. This parameter is not used in Spark Connect.
+ *
+ * @since 4.0.0
+ */
+ def getOrCreate(sparkContext: SparkContext): SQLContext = {
+ SparkSession.builder().getOrCreate().sqlContext
+ }
+
+ /** @inheritdoc */
+ override def setActive(sqlContext: SQLContext): Unit =
super.setActive(sqlContext)
+}
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 939a1341a891..b6bba8251913 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -188,8 +188,7 @@ class SparkSession private[sql] (
throw ConnectClientUnsupportedErrors.sessionState()
/** @inheritdoc */
- override def sqlContext: SQLContext =
- throw ConnectClientUnsupportedErrors.sqlContext()
+ override val sqlContext: SQLContext = new SQLContext(this)
/** @inheritdoc */
override def listenerManager: ExecutionListenerManager =
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/ConnectClientUnsupportedErrors.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/ConnectClientUnsupportedErrors.scala
index e73bcb8a0059..5783a20348d7 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/ConnectClientUnsupportedErrors.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/ConnectClientUnsupportedErrors.scala
@@ -53,7 +53,4 @@ private[sql] object ConnectClientUnsupportedErrors {
def sparkContext(): SparkUnsupportedOperationException =
unsupportedFeatureException("SESSION_SPARK_CONTEXT")
-
- def sqlContext(): SparkUnsupportedOperationException =
- unsupportedFeatureException("SESSION_SQL_CONTEXT")
}
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UnsupportedFeaturesSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UnsupportedFeaturesSuite.scala
index 6a26cf581751..42ae6987c9f3 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UnsupportedFeaturesSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UnsupportedFeaturesSuite.scala
@@ -79,10 +79,6 @@ class UnsupportedFeaturesSuite extends ConnectFunSuite {
_.listenerManager
}
- testUnsupportedFeature("SparkSession.sqlContext", "SESSION_SQL_CONTEXT") {
- _.sqlContext
- }
-
testUnsupportedFeature(
"SparkSession.baseRelationToDataFrame",
"SESSION_BASE_RELATION_TO_DATAFRAME") {
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index b5ea973aa1d7..4ec84a4087eb 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -176,8 +176,6 @@ object CheckConnectJvmClientCompatibility {
// Skip unsupported classes
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.ExperimentalMethods"),
-
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLContext"),
-
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLContext$*"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SparkSessionExtensions"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.SparkSessionExtensionsProvider"),
@@ -331,6 +329,11 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.SparkSession#Builder.interceptor"),
+ // Private case class in SQLContext
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLContext$ListTableRow"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.SQLContext$ListTableRow$"),
+
// SQLImplicits
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.session"),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
b/sql/api/src/main/scala/org/apache/spark/sql/api/SQLContext.scala
similarity index 55%
copy from sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
copy to sql/api/src/main/scala/org/apache/spark/sql/api/SQLContext.scala
index 636899a7acb0..50590fffa152 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SQLContext.scala
@@ -15,34 +15,31 @@
* limitations under the License.
*/
-package org.apache.spark.sql
-
-import java.util.Properties
+package org.apache.spark.sql.api
import scala.collection.immutable
import scala.reflect.runtime.universe.TypeTag
-import org.apache.spark.{SparkConf, SparkContext}
+import _root_.java.util.{List => JList, Map => JMap, Properties}
+
+import org.apache.spark.SparkContext
import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable,
Unstable}
-import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.analysis.{CurrentNamespace,
UnresolvedNamespace}
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.ShowTables
-import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.{Encoder, Encoders, ExperimentalMethods, Row}
+import org.apache.spark.sql.api.SQLImplicits
+import org.apache.spark.sql.catalog.Table
+import org.apache.spark.sql.functions.{array_size, coalesce, col, lit, when}
import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQueryManager}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ExecutionListenerManager
/**
* The entry point for working with structured data (rows and columns) in
Spark 1.x.
*
- * As of Spark 2.0, this is replaced by [[SparkSession]]. However, we are
keeping the class
- * here for backward compatibility.
+ * As of Spark 2.0, this is replaced by [[SparkSession]]. However, we are
keeping the class here
+ * for backward compatibility.
*
* @groupname basic Basic Operations
* @groupname ddl_ops Persistent Catalog DDL
@@ -56,47 +53,27 @@ import org.apache.spark.sql.util.ExecutionListenerManager
* @since 1.0.0
*/
@Stable
-class SQLContext private[sql](val sparkSession: SparkSession)
- extends Logging with Serializable {
-
- self =>
-
- sparkSession.sparkContext.assertNotStopped()
+abstract class SQLContext private[sql] (val sparkSession: SparkSession)
+ extends Logging
+ with Serializable {
// Note: Since Spark 2.0 this class has become a wrapper of SparkSession,
where the
// real functionality resides. This class remains mainly for backward
compatibility.
- @deprecated("Use SparkSession.builder instead", "2.0.0")
- def this(sc: SparkContext) = {
- this(SparkSession.builder().sparkContext(sc).getOrCreate())
- }
-
- @deprecated("Use SparkSession.builder instead", "2.0.0")
- def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
-
- // TODO: move this logic into SparkSession
-
- private[sql] def sessionState: SessionState = sparkSession.sessionState
- private[sql] def sharedState: SharedState = sparkSession.sharedState
- @deprecated("Use SparkSession.sessionState.conf instead", "4.0.0")
- private[sql] def conf: SQLConf = sessionState.conf
-
def sparkContext: SparkContext = sparkSession.sparkContext
/**
- * Returns a [[SQLContext]] as new session, with separated SQL
configurations, temporary
- * tables, registered functions, but sharing the same `SparkContext`, cached
data and
- * other things.
+ * Returns a [[SQLContext]] as new session, with separated SQL
configurations, temporary tables,
+ * registered functions, but sharing the same `SparkContext`, cached data
and other things.
*
* @since 1.6.0
*/
- def newSession(): SQLContext = sparkSession.newSession().sqlContext
+ def newSession(): SQLContext
/**
- * An interface to register custom
[[org.apache.spark.sql.util.QueryExecutionListener]]s
- * that listen for execution metrics.
+ * An interface to register custom QueryExecutionListener that listen for
execution metrics.
*/
- def listenerManager: ExecutionListenerManager = sparkSession.listenerManager
+ def listenerManager: ExecutionListenerManager
/**
* Set Spark SQL configuration properties.
@@ -104,16 +81,7 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
* @group config
* @since 1.0.0
*/
- def setConf(props: Properties): Unit = {
- sessionState.conf.setConf(props)
- }
-
- /**
- * Set the given Spark SQL configuration property.
- */
- private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
- sessionState.conf.setConf(entry, value)
- }
+ def setConf(props: Properties): Unit
/**
* Set the given Spark SQL configuration property.
@@ -147,8 +115,8 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
}
/**
- * Return all the configuration properties that have been set (i.e. not the
default).
- * This creates a new copy of the config properties in the form of a Map.
+ * Return all the configuration properties that have been set (i.e. not the
default). This
+ * creates a new copy of the config properties in the form of a Map.
*
* @group config
* @since 1.0.0
@@ -158,9 +126,8 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
}
/**
- * :: Experimental ::
- * A collection of methods that are considered experimental, but can be used
to hook into
- * the query planner for advanced functionality.
+ * :: Experimental :: A collection of methods that are considered
experimental, but can be used
+ * to hook into the query planner for advanced functionality.
*
* @group basic
* @since 1.3.0
@@ -168,7 +135,7 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
@Experimental
@transient
@Unstable
- def experimental: ExperimentalMethods = sparkSession.experimental
+ def experimental: ExperimentalMethods
/**
* Returns a `DataFrame` with no rows or columns.
@@ -176,7 +143,7 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
* @group basic
* @since 1.3.0
*/
- def emptyDataFrame: DataFrame = sparkSession.emptyDataFrame
+ def emptyDataFrame: Dataset[Row] = sparkSession.emptyDataFrame
/**
* A collection of methods for registering user-defined functions (UDF).
@@ -193,14 +160,29 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
* DataTypes.StringType);
* }}}
*
- * @note The user-defined functions must be deterministic. Due to
optimization,
- * duplicate invocations may be eliminated or the function may even be
invoked more times than
- * it is present in the query.
+ * @note
+ * The user-defined functions must be deterministic. Due to optimization,
duplicate
+ * invocations may be eliminated or the function may even be invoked more
times than it is
+ * present in the query.
*
* @group basic
* @since 1.3.0
*/
- def udf: UDFRegistration = sparkSession.udf
+ def udf: UDFRegistration
+
+ /**
+ * (Scala-specific) Implicit methods available in Scala for converting
common Scala objects into
+ * `DataFrame`s.
+ *
+ * {{{
+ * val sqlContext = new SQLContext(sc)
+ * import sqlContext.implicits._
+ * }}}
+ *
+ * @group basic
+ * @since 1.3.0
+ */
+ val implicits: SQLImplicits
/**
* Returns true if the table is currently cached in-memory.
@@ -237,32 +219,13 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
sparkSession.catalog.clearCache()
}
- // scalastyle:off
- // Disable style checker so "implicits" object can start with lowercase i
- /**
- * (Scala-specific) Implicit methods available in Scala for converting
- * common Scala objects into `DataFrame`s.
- *
- * {{{
- * val sqlContext = new SQLContext(sc)
- * import sqlContext.implicits._
- * }}}
- *
- * @group basic
- * @since 1.3.0
- */
- object implicits extends SQLImplicits {
- override protected def session: SparkSession = sparkSession
- }
- // scalastyle:on
-
/**
* Creates a DataFrame from an RDD of Product (e.g. case classes, tuples).
*
* @group dataframes
* @since 1.3.0
*/
- def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = {
+ def createDataFrame[A <: Product: TypeTag](rdd: RDD[A]): Dataset[Row] = {
sparkSession.createDataFrame(rdd)
}
@@ -272,7 +235,7 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
* @group dataframes
* @since 1.3.0
*/
- def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = {
+ def createDataFrame[A <: Product: TypeTag](data: Seq[A]): Dataset[Row] = {
sparkSession.createDataFrame(data)
}
@@ -282,16 +245,15 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
* @group dataframes
* @since 1.3.0
*/
- def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
+ def baseRelationToDataFrame(baseRelation: BaseRelation): Dataset[Row] = {
sparkSession.baseRelationToDataFrame(baseRelation)
}
/**
- * :: DeveloperApi ::
- * Creates a `DataFrame` from an `RDD` containing [[Row]]s using the given
schema.
- * It is important to make sure that the structure of every [[Row]] of the
provided RDD matches
- * the provided schema. Otherwise, there will be runtime exception.
- * Example:
+ * :: DeveloperApi :: Creates a `DataFrame` from an `RDD` containing
+ * [[org.apache.spark.sql.Row Row]]s using the given schema. It is important
to make sure that
+ * the structure of every [[org.apache.spark.sql.Row Row]] of the provided
RDD matches the
+ * provided schema. Otherwise, there will be runtime exception. Example:
* {{{
* import org.apache.spark.sql._
* import org.apache.spark.sql.types._
@@ -319,17 +281,18 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
* @since 1.3.0
*/
@DeveloperApi
- def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
+ def createDataFrame(rowRDD: RDD[Row], schema: StructType): Dataset[Row] = {
sparkSession.createDataFrame(rowRDD, schema)
}
/**
* Creates a [[Dataset]] from a local Seq of data of a given type. This
method requires an
- * encoder (to convert a JVM object of type `T` to and from the internal
Spark SQL representation)
- * that is generally created automatically through implicits from a
`SparkSession`, or can be
- * created explicitly by calling static methods on [[Encoders]].
+ * encoder (to convert a JVM object of type `T` to and from the internal
Spark SQL
+ * representation) that is generally created automatically through implicits
from a
+ * `SparkSession`, or can be created explicitly by calling static methods on
+ * [[org.apache.spark.sql.Encoders Encoders]].
*
- * == Example ==
+ * ==Example==
*
* {{{
*
@@ -351,30 +314,30 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
* @since 2.0.0
* @group dataset
*/
- def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {
+ def createDataset[T: Encoder](data: Seq[T]): Dataset[T] = {
sparkSession.createDataset(data)
}
/**
- * Creates a [[Dataset]] from an RDD of a given type. This method requires an
- * encoder (to convert a JVM object of type `T` to and from the internal
Spark SQL representation)
- * that is generally created automatically through implicits from a
`SparkSession`, or can be
- * created explicitly by calling static methods on [[Encoders]].
+ * Creates a [[Dataset]] from an RDD of a given type. This method requires
an encoder (to
+ * convert a JVM object of type `T` to and from the internal Spark SQL
representation) that is
+ * generally created automatically through implicits from a `SparkSession`,
or can be created
+ * explicitly by calling static methods on [[org.apache.spark.sql.Encoders
Encoders]].
*
* @since 2.0.0
* @group dataset
*/
- def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = {
+ def createDataset[T: Encoder](data: RDD[T]): Dataset[T] = {
sparkSession.createDataset(data)
}
/**
- * Creates a [[Dataset]] from a `java.util.List` of a given type. This
method requires an
- * encoder (to convert a JVM object of type `T` to and from the internal
Spark SQL representation)
- * that is generally created automatically through implicits from a
`SparkSession`, or can be
- * created explicitly by calling static methods on [[Encoders]].
+ * Creates a [[Dataset]] from a `JList` of a given type. This method
requires an encoder (to
+ * convert a JVM object of type `T` to and from the internal Spark SQL
representation) that is
+ * generally created automatically through implicits from a `SparkSession`,
or can be created
+ * explicitly by calling static methods on [[org.apache.spark.sql.Encoders
Encoders]].
*
- * == Java Example ==
+ * ==Java Example==
*
* {{{
* List<String> data = Arrays.asList("hello", "world");
@@ -384,83 +347,71 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
* @since 2.0.0
* @group dataset
*/
- def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T] = {
+ def createDataset[T: Encoder](data: JList[T]): Dataset[T] = {
sparkSession.createDataset(data)
}
/**
- * Creates a DataFrame from an RDD[Row]. User can specify whether the input
rows should be
- * converted to Catalyst rows.
- */
- private[sql]
- def internalCreateDataFrame(
- catalystRows: RDD[InternalRow],
- schema: StructType,
- isStreaming: Boolean = false) = {
- sparkSession.internalCreateDataFrame(catalystRows, schema, isStreaming)
- }
-
- /**
- * :: DeveloperApi ::
- * Creates a `DataFrame` from a `JavaRDD` containing [[Row]]s using the
given schema.
- * It is important to make sure that the structure of every [[Row]] of the
provided RDD matches
- * the provided schema. Otherwise, there will be runtime exception.
+ * :: DeveloperApi :: Creates a `DataFrame` from a `JavaRDD` containing
+ * [[org.apache.spark.sql.Row Row]]s using the given schema. It is important
to make sure that
+ * the structure of every [[org.apache.spark.sql.Row Row]] of the provided
RDD matches the
+ * provided schema. Otherwise, there will be runtime exception.
*
* @group dataframes
* @since 1.3.0
*/
@DeveloperApi
- def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
+ def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): Dataset[Row]
= {
sparkSession.createDataFrame(rowRDD, schema)
}
/**
- * :: DeveloperApi ::
- * Creates a `DataFrame` from a `java.util.List` containing [[Row]]s using
the given schema.
- * It is important to make sure that the structure of every [[Row]] of the
provided List matches
- * the provided schema. Otherwise, there will be runtime exception.
+ * :: DeveloperApi :: Creates a `DataFrame` from a `JList` containing
+ * [[org.apache.spark.sql.Row Row]]s using the given schema. It is important
to make sure that
+ * the structure of every [[org.apache.spark.sql.Row Row]] of the provided
List matches the
+ * provided schema. Otherwise, there will be runtime exception.
*
* @group dataframes
* @since 1.6.0
*/
@DeveloperApi
- def createDataFrame(rows: java.util.List[Row], schema: StructType):
DataFrame = {
+ def createDataFrame(rows: JList[Row], schema: StructType): Dataset[Row] = {
sparkSession.createDataFrame(rows, schema)
}
/**
* Applies a schema to an RDD of Java Beans.
*
- * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
- * SELECT * queries will return the columns in an undefined order.
+ * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
SELECT * queries
+ * will return the columns in an undefined order.
* @group dataframes
* @since 1.3.0
*/
- def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
+ def createDataFrame(rdd: RDD[_], beanClass: Class[_]): Dataset[Row] = {
sparkSession.createDataFrame(rdd, beanClass)
}
/**
* Applies a schema to an RDD of Java Beans.
*
- * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
- * SELECT * queries will return the columns in an undefined order.
+ * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
SELECT * queries
+ * will return the columns in an undefined order.
* @group dataframes
* @since 1.3.0
*/
- def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
+ def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): Dataset[Row] = {
sparkSession.createDataFrame(rdd, beanClass)
}
/**
* Applies a schema to a List of Java Beans.
*
- * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
- * SELECT * queries will return the columns in an undefined order.
+ * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
SELECT * queries
+ * will return the columns in an undefined order.
* @group dataframes
* @since 1.6.0
*/
- def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame
= {
+ def createDataFrame(data: JList[_], beanClass: Class[_]): Dataset[Row] = {
sparkSession.createDataFrame(data, beanClass)
}
@@ -475,8 +426,7 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
* @group genericdata
* @since 1.4.0
*/
- def read: DataFrameReader = sparkSession.read
-
+ def read: DataFrameReader
/**
* Returns a `DataStreamReader` that can be used to read streaming data in
as a `DataFrame`.
@@ -487,33 +437,29 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
*
* @since 2.0.0
*/
- def readStream: DataStreamReader = sparkSession.readStream
-
+ def readStream: DataStreamReader
/**
- * Creates an external table from the given path and returns the
corresponding DataFrame.
- * It will use the default data source configured by
spark.sql.sources.default.
+ * Creates an external table from the given path and returns the
corresponding DataFrame. It
+ * will use the default data source configured by spark.sql.sources.default.
*
* @group ddl_ops
* @since 1.3.0
*/
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
- def createExternalTable(tableName: String, path: String): DataFrame = {
+ def createExternalTable(tableName: String, path: String): Dataset[Row] = {
sparkSession.catalog.createTable(tableName, path)
}
/**
- * Creates an external table from the given path based on a data source
- * and returns the corresponding DataFrame.
+ * Creates an external table from the given path based on a data source and
returns the
+ * corresponding DataFrame.
*
* @group ddl_ops
* @since 1.3.0
*/
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
- def createExternalTable(
- tableName: String,
- path: String,
- source: String): DataFrame = {
+ def createExternalTable(tableName: String, path: String, source: String):
Dataset[Row] = {
sparkSession.catalog.createTable(tableName, path, source)
}
@@ -528,14 +474,13 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
def createExternalTable(
tableName: String,
source: String,
- options: java.util.Map[String, String]): DataFrame = {
+ options: JMap[String, String]): Dataset[Row] = {
sparkSession.catalog.createTable(tableName, source, options)
}
/**
- * (Scala-specific)
- * Creates an external table from the given path based on a data source and
a set of options.
- * Then, returns the corresponding DataFrame.
+ * (Scala-specific) Creates an external table from the given path based on a
data source and a
+ * set of options. Then, returns the corresponding DataFrame.
*
* @group ddl_ops
* @since 1.3.0
@@ -544,13 +489,13 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
def createExternalTable(
tableName: String,
source: String,
- options: Map[String, String]): DataFrame = {
+ options: Map[String, String]): Dataset[Row] = {
sparkSession.catalog.createTable(tableName, source, options)
}
/**
- * Create an external table from the given path based on a data source, a
schema and
- * a set of options. Then, returns the corresponding DataFrame.
+ * Create an external table from the given path based on a data source, a
schema and a set of
+ * options. Then, returns the corresponding DataFrame.
*
* @group ddl_ops
* @since 1.3.0
@@ -560,14 +505,13 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
tableName: String,
source: String,
schema: StructType,
- options: java.util.Map[String, String]): DataFrame = {
+ options: JMap[String, String]): Dataset[Row] = {
sparkSession.catalog.createTable(tableName, source, schema, options)
}
/**
- * (Scala-specific)
- * Create an external table from the given path based on a data source, a
schema and
- * a set of options. Then, returns the corresponding DataFrame.
+ * (Scala-specific) Create an external table from the given path based on a
data source, a
+ * schema and a set of options. Then, returns the corresponding DataFrame.
*
* @group ddl_ops
* @since 1.3.0
@@ -577,23 +521,16 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
tableName: String,
source: String,
schema: StructType,
- options: Map[String, String]): DataFrame = {
+ options: Map[String, String]): Dataset[Row] = {
sparkSession.catalog.createTable(tableName, source, schema, options)
}
- /**
- * Registers the given `DataFrame` as a temporary table in the catalog.
Temporary tables exist
- * only during the lifetime of this instance of SQLContext.
- */
- private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String):
Unit = {
- df.createOrReplaceTempView(tableName)
- }
-
/**
* Drops the temporary table with the given table name in the catalog. If
the table has been
* cached/persisted before, it's also unpersisted.
*
- * @param tableName the name of the table to be unregistered.
+ * @param tableName
+ * the name of the table to be unregistered.
* @group basic
* @since 1.3.0
*/
@@ -602,54 +539,53 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
}
/**
- * Creates a `DataFrame` with a single `LongType` column named `id`,
containing elements
- * in a range from 0 to `end` (exclusive) with step value 1.
+ * Creates a `DataFrame` with a single `LongType` column named `id`,
containing elements in a
+ * range from 0 to `end` (exclusive) with step value 1.
*
* @since 1.4.1
* @group dataframe
*/
- def range(end: Long): DataFrame = sparkSession.range(end).toDF()
+ def range(end: Long): Dataset[Row] = sparkSession.range(end).toDF()
/**
- * Creates a `DataFrame` with a single `LongType` column named `id`,
containing elements
- * in a range from `start` to `end` (exclusive) with step value 1.
+ * Creates a `DataFrame` with a single `LongType` column named `id`,
containing elements in a
+ * range from `start` to `end` (exclusive) with step value 1.
*
* @since 1.4.0
* @group dataframe
*/
- def range(start: Long, end: Long): DataFrame = sparkSession.range(start,
end).toDF()
+ def range(start: Long, end: Long): Dataset[Row] = sparkSession.range(start,
end).toDF()
/**
- * Creates a `DataFrame` with a single `LongType` column named `id`,
containing elements
- * in a range from `start` to `end` (exclusive) with a step value.
+ * Creates a `DataFrame` with a single `LongType` column named `id`,
containing elements in a
+ * range from `start` to `end` (exclusive) with a step value.
*
* @since 2.0.0
* @group dataframe
*/
- def range(start: Long, end: Long, step: Long): DataFrame = {
+ def range(start: Long, end: Long, step: Long): Dataset[Row] = {
sparkSession.range(start, end, step).toDF()
}
/**
- * Creates a `DataFrame` with a single `LongType` column named `id`,
containing elements
- * in an range from `start` to `end` (exclusive) with an step value, with
partition number
- * specified.
+ * Creates a `DataFrame` with a single `LongType` column named `id`,
containing elements in an
+ * range from `start` to `end` (exclusive) with an step value, with
partition number specified.
*
* @since 1.4.0
* @group dataframe
*/
- def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame
= {
+ def range(start: Long, end: Long, step: Long, numPartitions: Int):
Dataset[Row] = {
sparkSession.range(start, end, step, numPartitions).toDF()
}
/**
- * Executes a SQL query using Spark, returning the result as a `DataFrame`.
- * This API eagerly runs DDL/DML commands, but not for SELECT queries.
+ * Executes a SQL query using Spark, returning the result as a `DataFrame`.
This API eagerly
+ * runs DDL/DML commands, but not for SELECT queries.
*
* @group basic
* @since 1.3.0
*/
- def sql(sqlText: String): DataFrame = sparkSession.sql(sqlText)
+ def sql(sqlText: String): Dataset[Row] = sparkSession.sql(sqlText)
/**
* Returns the specified table as a `DataFrame`.
@@ -657,41 +593,55 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
* @group ddl_ops
* @since 1.3.0
*/
- def table(tableName: String): DataFrame = {
+ def table(tableName: String): Dataset[Row] = {
sparkSession.table(tableName)
}
/**
- * Returns a `DataFrame` containing names of existing tables in the current
database.
- * The returned DataFrame has three columns, database, tableName and
isTemporary (a Boolean
+ * Returns a `DataFrame` containing names of existing tables in the current
database. The
+ * returned DataFrame has three columns, database, tableName and isTemporary
(a Boolean
* indicating if a table is a temporary one or not).
*
* @group ddl_ops
* @since 1.3.0
*/
- def tables(): DataFrame = {
- Dataset.ofRows(sparkSession, ShowTables(CurrentNamespace, None))
+ def tables(): Dataset[Row] = {
+ mapTableDatasetOutput(sparkSession.catalog.listTables())
}
/**
- * Returns a `DataFrame` containing names of existing tables in the given
database.
- * The returned DataFrame has three columns, database, tableName and
isTemporary (a Boolean
- * indicating if a table is a temporary one or not).
+ * Returns a `DataFrame` containing names of existing tables in the given
database. The returned
+ * DataFrame has three columns, database, tableName and isTemporary (a
Boolean indicating if a
+ * table is a temporary one or not).
*
* @group ddl_ops
* @since 1.3.0
*/
- def tables(databaseName: String): DataFrame = {
- Dataset.ofRows(sparkSession,
ShowTables(UnresolvedNamespace(Seq(databaseName)), None))
+ def tables(databaseName: String): Dataset[Row] = {
+ mapTableDatasetOutput(sparkSession.catalog.listTables(databaseName))
+ }
+
+ private def mapTableDatasetOutput(tables: Dataset[Table]): Dataset[Row] = {
+ tables
+ .select(
+ // Re-implement `org.apache.spark.sql.catalog.Table.database` method.
+ // Abusing `coalesce` to tell Spark all these columns are not nullable.
+ when(
+ coalesce(array_size(col("namespace")), lit(0)).equalTo(lit(1)),
+ coalesce(col("namespace")(0), lit("")))
+ .otherwise(lit(""))
+ .as("namespace"),
+ coalesce(col("name"), lit("")).as("tableName"),
+ col("isTemporary"))
}
/**
* Returns a `StreamingQueryManager` that allows managing all the
- * [[org.apache.spark.sql.streaming.StreamingQuery StreamingQueries]] active
on `this` context.
+ * [[org.apache.spark.sql.api.StreamingQuery StreamingQueries]] active on
`this` context.
*
* @since 2.0.0
*/
- def streams: StreamingQueryManager = sparkSession.streams
+ def streams: StreamingQueryManager
/**
* Returns the names of tables in the current database as an array.
@@ -710,7 +660,11 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
* @since 1.3.0
*/
def tableNames(databaseName: String): Array[String] = {
- sessionState.catalog.listTables(databaseName).map(_.table).toArray
+ sparkSession.catalog
+ .listTables(databaseName)
+ .select(col("name"))
+ .as(Encoders.STRING)
+ .collect()
}
////////////////////////////////////////////////////////////////////////////
@@ -720,34 +674,38 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
////////////////////////////////////////////////////////////////////////////
/**
- * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
+ * @deprecated
+ * As of 1.3.0, replaced by `createDataFrame()`.
*/
@deprecated("Use createDataFrame instead.", "1.3.0")
- def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
+ def applySchema(rowRDD: RDD[Row], schema: StructType): Dataset[Row] = {
createDataFrame(rowRDD, schema)
}
/**
- * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
+ * @deprecated
+ * As of 1.3.0, replaced by `createDataFrame()`.
*/
@deprecated("Use createDataFrame instead.", "1.3.0")
- def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
+ def applySchema(rowRDD: JavaRDD[Row], schema: StructType): Dataset[Row] = {
createDataFrame(rowRDD, schema)
}
/**
- * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
+ * @deprecated
+ * As of 1.3.0, replaced by `createDataFrame()`.
*/
@deprecated("Use createDataFrame instead.", "1.3.0")
- def applySchema(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
+ def applySchema(rdd: RDD[_], beanClass: Class[_]): Dataset[Row] = {
createDataFrame(rdd, beanClass)
}
/**
- * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
+ * @deprecated
+ * As of 1.3.0, replaced by `createDataFrame()`.
*/
@deprecated("Use createDataFrame instead.", "1.3.0")
- def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
+ def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): Dataset[Row] = {
createDataFrame(rdd, beanClass)
}
@@ -756,82 +714,87 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
* `DataFrame` if no paths are passed in.
*
* @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().parquet()`.
+ * @deprecated
+ * As of 1.4.0, replaced by `read().parquet()`.
*/
@deprecated("Use read.parquet() instead.", "1.4.0")
@scala.annotation.varargs
- def parquetFile(paths: String*): DataFrame = {
+ def parquetFile(paths: String*): Dataset[Row] = {
if (paths.isEmpty) {
emptyDataFrame
} else {
- read.parquet(paths : _*)
+ read.parquet(paths: _*)
}
}
/**
- * Loads a JSON file (one object per line), returning the result as a
`DataFrame`.
- * It goes through the entire dataset once to determine the schema.
+ * Loads a JSON file (one object per line), returning the result as a
`DataFrame`. It goes
+ * through the entire dataset once to determine the schema.
*
* @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().json()`.
+ * @deprecated
+ * As of 1.4.0, replaced by `read().json()`.
*/
@deprecated("Use read.json() instead.", "1.4.0")
- def jsonFile(path: String): DataFrame = {
+ def jsonFile(path: String): Dataset[Row] = {
read.json(path)
}
/**
- * Loads a JSON file (one object per line) and applies the given schema,
- * returning the result as a `DataFrame`.
+ * Loads a JSON file (one object per line) and applies the given schema,
returning the result as
+ * a `DataFrame`.
*
* @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().json()`.
+ * @deprecated
+ * As of 1.4.0, replaced by `read().json()`.
*/
@deprecated("Use read.json() instead.", "1.4.0")
- def jsonFile(path: String, schema: StructType): DataFrame = {
+ def jsonFile(path: String, schema: StructType): Dataset[Row] = {
read.schema(schema).json(path)
}
/**
* @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().json()`.
+ * @deprecated
+ * As of 1.4.0, replaced by `read().json()`.
*/
@deprecated("Use read.json() instead.", "1.4.0")
- def jsonFile(path: String, samplingRatio: Double): DataFrame = {
+ def jsonFile(path: String, samplingRatio: Double): Dataset[Row] = {
read.option("samplingRatio", samplingRatio.toString).json(path)
}
/**
* Loads an RDD[String] storing JSON objects (one object per record),
returning the result as a
- * `DataFrame`.
- * It goes through the entire dataset once to determine the schema.
+ * `DataFrame`. It goes through the entire dataset once to determine the
schema.
*
* @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().json()`.
+ * @deprecated
+ * As of 1.4.0, replaced by `read().json()`.
*/
@deprecated("Use read.json() instead.", "1.4.0")
- def jsonRDD(json: RDD[String]): DataFrame = read.json(json)
+ def jsonRDD(json: RDD[String]): Dataset[Row] = read.json(json)
/**
* Loads an RDD[String] storing JSON objects (one object per record),
returning the result as a
- * `DataFrame`.
- * It goes through the entire dataset once to determine the schema.
+ * `DataFrame`. It goes through the entire dataset once to determine the
schema.
*
* @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().json()`.
+ * @deprecated
+ * As of 1.4.0, replaced by `read().json()`.
*/
@deprecated("Use read.json() instead.", "1.4.0")
- def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json)
+ def jsonRDD(json: JavaRDD[String]): Dataset[Row] = read.json(json)
/**
- * Loads an RDD[String] storing JSON objects (one object per record) and
applies the given schema,
- * returning the result as a `DataFrame`.
+ * Loads an RDD[String] storing JSON objects (one object per record) and
applies the given
+ * schema, returning the result as a `DataFrame`.
*
* @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().json()`.
+ * @deprecated
+ * As of 1.4.0, replaced by `read().json()`.
*/
@deprecated("Use read.json() instead.", "1.4.0")
- def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
+ def jsonRDD(json: RDD[String], schema: StructType): Dataset[Row] = {
read.schema(schema).json(json)
}
@@ -840,46 +803,50 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
* schema, returning the result as a `DataFrame`.
*
* @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().json()`.
+ * @deprecated
+ * As of 1.4.0, replaced by `read().json()`.
*/
@deprecated("Use read.json() instead.", "1.4.0")
- def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
+ def jsonRDD(json: JavaRDD[String], schema: StructType): Dataset[Row] = {
read.schema(schema).json(json)
}
/**
- * Loads an RDD[String] storing JSON objects (one object per record)
inferring the
- * schema, returning the result as a `DataFrame`.
+ * Loads an RDD[String] storing JSON objects (one object per record)
inferring the schema,
+ * returning the result as a `DataFrame`.
*
* @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().json()`.
+ * @deprecated
+ * As of 1.4.0, replaced by `read().json()`.
*/
@deprecated("Use read.json() instead.", "1.4.0")
- def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
+ def jsonRDD(json: RDD[String], samplingRatio: Double): Dataset[Row] = {
read.option("samplingRatio", samplingRatio.toString).json(json)
}
/**
- * Loads a JavaRDD[String] storing JSON objects (one object per record)
inferring the
- * schema, returning the result as a `DataFrame`.
+ * Loads a JavaRDD[String] storing JSON objects (one object per record)
inferring the schema,
+ * returning the result as a `DataFrame`.
*
* @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().json()`.
+ * @deprecated
+ * As of 1.4.0, replaced by `read().json()`.
*/
@deprecated("Use read.json() instead.", "1.4.0")
- def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
+ def jsonRDD(json: JavaRDD[String], samplingRatio: Double): Dataset[Row] = {
read.option("samplingRatio", samplingRatio.toString).json(json)
}
/**
- * Returns the dataset stored at path as a DataFrame,
- * using the default data source configured by spark.sql.sources.default.
+ * Returns the dataset stored at path as a DataFrame, using the default data
source configured
+ * by spark.sql.sources.default.
*
* @group genericdata
- * @deprecated As of 1.4.0, replaced by `read().load(path)`.
+ * @deprecated
+ * As of 1.4.0, replaced by `read().load(path)`.
*/
@deprecated("Use read.load(path) instead.", "1.4.0")
- def load(path: String): DataFrame = {
+ def load(path: String): Dataset[Row] = {
read.load(path)
}
@@ -887,90 +854,96 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
* Returns the dataset stored at path as a DataFrame, using the given data
source.
*
* @group genericdata
- * @deprecated As of 1.4.0, replaced by `read().format(source).load(path)`.
+ * @deprecated
+ * As of 1.4.0, replaced by `read().format(source).load(path)`.
*/
@deprecated("Use read.format(source).load(path) instead.", "1.4.0")
- def load(path: String, source: String): DataFrame = {
+ def load(path: String, source: String): Dataset[Row] = {
read.format(source).load(path)
}
/**
- * (Java-specific) Returns the dataset specified by the given data source and
- * a set of options as a DataFrame.
+ * (Java-specific) Returns the dataset specified by the given data source
and a set of options
+ * as a DataFrame.
*
* @group genericdata
- * @deprecated As of 1.4.0, replaced by
`read().format(source).options(options).load()`.
+ * @deprecated
+ * As of 1.4.0, replaced by
`read().format(source).options(options).load()`.
*/
@deprecated("Use read.format(source).options(options).load() instead.",
"1.4.0")
- def load(source: String, options: java.util.Map[String, String]): DataFrame
= {
+ def load(source: String, options: JMap[String, String]): Dataset[Row] = {
read.options(options).format(source).load()
}
/**
- * (Scala-specific) Returns the dataset specified by the given data source
and
- * a set of options as a DataFrame.
+ * (Scala-specific) Returns the dataset specified by the given data source
and a set of options
+ * as a DataFrame.
*
* @group genericdata
- * @deprecated As of 1.4.0, replaced by
`read().format(source).options(options).load()`.
+ * @deprecated
+ * As of 1.4.0, replaced by
`read().format(source).options(options).load()`.
*/
@deprecated("Use read.format(source).options(options).load() instead.",
"1.4.0")
- def load(source: String, options: Map[String, String]): DataFrame = {
+ def load(source: String, options: Map[String, String]): Dataset[Row] = {
read.options(options).format(source).load()
}
/**
- * (Java-specific) Returns the dataset specified by the given data source and
- * a set of options as a DataFrame, using the given schema as the schema of
the DataFrame.
+ * (Java-specific) Returns the dataset specified by the given data source
and a set of options
+ * as a DataFrame, using the given schema as the schema of the DataFrame.
*
* @group genericdata
- * @deprecated As of 1.4.0, replaced by
- * `read().format(source).schema(schema).options(options).load()`.
+ * @deprecated
+ * As of 1.4.0, replaced by
`read().format(source).schema(schema).options(options).load()`.
*/
@deprecated("Use read.format(source).schema(schema).options(options).load()
instead.", "1.4.0")
- def load(
- source: String,
- schema: StructType,
- options: java.util.Map[String, String]): DataFrame = {
+ def load(source: String, schema: StructType, options: JMap[String, String]):
Dataset[Row] = {
read.format(source).schema(schema).options(options).load()
}
/**
- * (Scala-specific) Returns the dataset specified by the given data source
and
- * a set of options as a DataFrame, using the given schema as the schema of
the DataFrame.
+ * (Scala-specific) Returns the dataset specified by the given data source
and a set of options
+ * as a DataFrame, using the given schema as the schema of the DataFrame.
*
* @group genericdata
- * @deprecated As of 1.4.0, replaced by
- * `read().format(source).schema(schema).options(options).load()`.
+ * @deprecated
+ * As of 1.4.0, replaced by
`read().format(source).schema(schema).options(options).load()`.
*/
@deprecated("Use read.format(source).schema(schema).options(options).load()
instead.", "1.4.0")
- def load(source: String, schema: StructType, options: Map[String, String]):
DataFrame = {
+ def load(source: String, schema: StructType, options: Map[String, String]):
Dataset[Row] = {
read.format(source).schema(schema).options(options).load()
}
/**
- * Construct a `DataFrame` representing the database table accessible via
JDBC URL
- * url named table.
+ * Construct a `DataFrame` representing the database table accessible via
JDBC URL url named
+ * table.
*
* @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().jdbc()`.
+ * @deprecated
+ * As of 1.4.0, replaced by `read().jdbc()`.
*/
@deprecated("Use read.jdbc() instead.", "1.4.0")
- def jdbc(url: String, table: String): DataFrame = {
+ def jdbc(url: String, table: String): Dataset[Row] = {
read.jdbc(url, table, new Properties)
}
/**
- * Construct a `DataFrame` representing the database table accessible via
JDBC URL
- * url named table. Partitions of the table will be retrieved in parallel
based on the parameters
- * passed to this function.
+ * Construct a `DataFrame` representing the database table accessible via
JDBC URL url named
+ * table. Partitions of the table will be retrieved in parallel based on the
parameters passed
+ * to this function.
*
- * @param columnName the name of a column of integral type that will be used
for partitioning.
- * @param lowerBound the minimum value of `columnName` used to decide
partition stride
- * @param upperBound the maximum value of `columnName` used to decide
partition stride
- * @param numPartitions the number of partitions. the range
`minValue`-`maxValue` will be split
- * evenly into this many partitions
+ * @param columnName
+ * the name of a column of integral type that will be used for
partitioning.
+ * @param lowerBound
+ * the minimum value of `columnName` used to decide partition stride
+ * @param upperBound
+ * the maximum value of `columnName` used to decide partition stride
+ * @param numPartitions
+ * the number of partitions. the range `minValue`-`maxValue` will be split
evenly into this
+ * many partitions
* @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().jdbc()`.
+ * @deprecated
+ * As of 1.4.0, replaced by `read().jdbc()`.
*/
@deprecated("Use read.jdbc() instead.", "1.4.0")
def jdbc(
@@ -979,34 +952,36 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
columnName: String,
lowerBound: Long,
upperBound: Long,
- numPartitions: Int): DataFrame = {
+ numPartitions: Int): Dataset[Row] = {
read.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions,
new Properties)
}
/**
- * Construct a `DataFrame` representing the database table accessible via
JDBC URL
- * url named table. The theParts parameter gives a list expressions
- * suitable for inclusion in WHERE clauses; each one defines one partition
- * of the `DataFrame`.
+ * Construct a `DataFrame` representing the database table accessible via
JDBC URL url named
+ * table. The theParts parameter gives a list expressions suitable for
inclusion in WHERE
+ * clauses; each one defines one partition of the `DataFrame`.
*
* @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().jdbc()`.
+ * @deprecated
+ * As of 1.4.0, replaced by `read().jdbc()`.
*/
@deprecated("Use read.jdbc() instead.", "1.4.0")
- def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = {
+ def jdbc(url: String, table: String, theParts: Array[String]): Dataset[Row]
= {
read.jdbc(url, table, theParts, new Properties)
}
}
/**
- * This SQLContext object contains utility functions to create a singleton
SQLContext instance,
- * or to get the created SQLContext instance.
+ * This SQLContext object contains utility functions to create a singleton
SQLContext instance, or
+ * to get the created SQLContext instance.
*
* It also provides utility functions to support preference for threads in
multiple sessions
* scenario, setActive could set a SQLContext for current thread, which will
be returned by
* getOrCreate instead of the global one.
*/
-object SQLContext {
+trait SQLContextCompanion {
+ private[sql] type SQLContextImpl <: SQLContext
+ private[sql] type SparkContextImpl <: SparkContext
/**
* Get the singleton SQLContext if it exists or create a new one using the
given SparkContext.
@@ -1014,31 +989,29 @@ object SQLContext {
* This function can be used to create a singleton SQLContext object that
can be shared across
* the JVM.
*
- * If there is an active SQLContext for current thread, it will be returned
instead of the global
- * one.
+ * If there is an active SQLContext for current thread, it will be returned
instead of the
+ * global one.
*
* @since 1.5.0
*/
@deprecated("Use SparkSession.builder instead", "2.0.0")
- def getOrCreate(sparkContext: SparkContext): SQLContext = {
- SparkSession.builder().sparkContext(sparkContext).getOrCreate().sqlContext
- }
+ def getOrCreate(sparkContext: SparkContextImpl): SQLContextImpl
/**
* Changes the SQLContext that will be returned in this thread and its
children when
- * SQLContext.getOrCreate() is called. This can be used to ensure that a
given thread receives
- * a SQLContext with an isolated session, instead of the global (first
created) context.
+ * SQLContext.getOrCreate() is called. This can be used to ensure that a
given thread receives a
+ * SQLContext with an isolated session, instead of the global (first
created) context.
*
* @since 1.6.0
*/
@deprecated("Use SparkSession.setActiveSession instead", "2.0.0")
- def setActive(sqlContext: SQLContext): Unit = {
+ def setActive(sqlContext: SQLContextImpl): Unit = {
SparkSession.setActiveSession(sqlContext.sparkSession)
}
/**
- * Clears the active SQLContext for current thread. Subsequent calls to
getOrCreate will
- * return the first created context instead of a thread-local override.
+ * Clears the active SQLContext for current thread. Subsequent calls to
getOrCreate will return
+ * the first created context instead of a thread-local override.
*
* @since 1.6.0
*/
@@ -1046,52 +1019,4 @@ object SQLContext {
def clearActive(): Unit = {
SparkSession.clearActiveSession()
}
-
- /**
- * Converts an iterator of Java Beans to InternalRow using the provided
- * bean info & schema. This is not related to the singleton, but is a static
- * method for internal use.
- */
- private[sql] def beansToRows(
- data: Iterator[_],
- beanClass: Class[_],
- attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
- def createStructConverter(cls: Class[_], fieldTypes: Seq[DataType]): Any
=> InternalRow = {
- val methodConverters =
- JavaTypeInference.getJavaBeanReadableProperties(cls).zip(fieldTypes)
- .map { case (property, fieldType) =>
- val method = property.getReadMethod
- method -> createConverter(method.getReturnType, fieldType)
- }
- value =>
- if (value == null) {
- null
- } else {
- new GenericInternalRow(
- methodConverters.map { case (method, converter) =>
- converter(method.invoke(value))
- })
- }
- }
- def createConverter(cls: Class[_], dataType: DataType): Any => Any =
dataType match {
- case struct: StructType => createStructConverter(cls,
struct.map(_.dataType))
- case _ => CatalystTypeConverters.createToCatalystConverter(dataType)
- }
- val dataConverter = createStructConverter(beanClass, attrs.map(_.dataType))
- data.map(dataConverter)
- }
-
- /**
- * Extract `spark.sql.*` properties from the conf and return them as a
[[Properties]].
- */
- private[sql] def getSQLProperties(sparkConf: SparkConf): Properties = {
- val properties = new Properties
- sparkConf.getAll.foreach { case (key, value) =>
- if (key.startsWith("spark.sql")) {
- properties.setProperty(key, value)
- }
- }
- properties
- }
-
}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
index 35f74497b96f..af2144cb9eb4 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
@@ -30,7 +30,7 @@ import org.apache.spark.{SparkConf, SparkContext,
SparkException}
import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable,
Unstable}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Encoder, ExperimentalMethods, Row, RuntimeConfig,
SparkSessionExtensions, SQLContext}
+import org.apache.spark.sql.{Encoder, ExperimentalMethods, Row, RuntimeConfig,
SparkSessionExtensions}
import org.apache.spark.sql.internal.{SessionState, SharedState}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType
diff --git a/sql/connect/shims/src/main/scala/org/apache/spark/shims.scala
b/sql/connect/shims/src/main/scala/org/apache/spark/shims.scala
index ad8771a03b28..9c5fb515580a 100644
--- a/sql/connect/shims/src/main/scala/org/apache/spark/shims.scala
+++ b/sql/connect/shims/src/main/scala/org/apache/spark/shims.scala
@@ -32,7 +32,6 @@ package rdd {
package sql {
class ExperimentalMethods
class SparkSessionExtensions
- class SQLContext
package execution {
class QueryExecution
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 636899a7acb0..1318563f8c93 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -17,21 +17,18 @@
package org.apache.spark.sql
-import java.util.Properties
+import java.util.{List => JList, Map => JMap, Properties}
-import scala.collection.immutable
import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable,
Unstable}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
-import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.analysis.{CurrentNamespace,
UnresolvedNamespace}
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.ShowTables
+import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQueryManager}
@@ -41,8 +38,8 @@ import org.apache.spark.sql.util.ExecutionListenerManager
/**
* The entry point for working with structured data (rows and columns) in
Spark 1.x.
*
- * As of Spark 2.0, this is replaced by [[SparkSession]]. However, we are
keeping the class
- * here for backward compatibility.
+ * As of Spark 2.0, this is replaced by [[SparkSession]]. However, we are
keeping the class here
+ * for backward compatibility.
*
* @groupname basic Basic Operations
* @groupname ddl_ops Persistent Catalog DDL
@@ -56,8 +53,8 @@ import org.apache.spark.sql.util.ExecutionListenerManager
* @since 1.0.0
*/
@Stable
-class SQLContext private[sql](val sparkSession: SparkSession)
- extends Logging with Serializable {
+class SQLContext private[sql] (override val sparkSession: SparkSession)
+ extends api.SQLContext(sparkSession) {
self =>
@@ -77,980 +74,325 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
// TODO: move this logic into SparkSession
private[sql] def sessionState: SessionState = sparkSession.sessionState
+
private[sql] def sharedState: SharedState = sparkSession.sharedState
+
@deprecated("Use SparkSession.sessionState.conf instead", "4.0.0")
private[sql] def conf: SQLConf = sessionState.conf
- def sparkContext: SparkContext = sparkSession.sparkContext
-
- /**
- * Returns a [[SQLContext]] as new session, with separated SQL
configurations, temporary
- * tables, registered functions, but sharing the same `SparkContext`, cached
data and
- * other things.
- *
- * @since 1.6.0
- */
- def newSession(): SQLContext = sparkSession.newSession().sqlContext
-
- /**
- * An interface to register custom
[[org.apache.spark.sql.util.QueryExecutionListener]]s
- * that listen for execution metrics.
- */
+ /** @inheritdoc */
def listenerManager: ExecutionListenerManager = sparkSession.listenerManager
- /**
- * Set Spark SQL configuration properties.
- *
- * @group config
- * @since 1.0.0
- */
+ /** @inheritdoc */
def setConf(props: Properties): Unit = {
sessionState.conf.setConf(props)
}
- /**
- * Set the given Spark SQL configuration property.
- */
private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
sessionState.conf.setConf(entry, value)
}
- /**
- * Set the given Spark SQL configuration property.
- *
- * @group config
- * @since 1.0.0
- */
- def setConf(key: String, value: String): Unit = {
- sparkSession.conf.set(key, value)
- }
-
- /**
- * Return the value of Spark SQL configuration property for the given key.
- *
- * @group config
- * @since 1.0.0
- */
- def getConf(key: String): String = {
- sparkSession.conf.get(key)
- }
-
- /**
- * Return the value of Spark SQL configuration property for the given key.
If the key is not set
- * yet, return `defaultValue`.
- *
- * @group config
- * @since 1.0.0
- */
- def getConf(key: String, defaultValue: String): String = {
- sparkSession.conf.get(key, defaultValue)
- }
-
- /**
- * Return all the configuration properties that have been set (i.e. not the
default).
- * This creates a new copy of the config properties in the form of a Map.
- *
- * @group config
- * @since 1.0.0
- */
- def getAllConfs: immutable.Map[String, String] = {
- sparkSession.conf.getAll
- }
-
- /**
- * :: Experimental ::
- * A collection of methods that are considered experimental, but can be used
to hook into
- * the query planner for advanced functionality.
- *
- * @group basic
- * @since 1.3.0
- */
+ /** @inheritdoc */
@Experimental
@transient
@Unstable
def experimental: ExperimentalMethods = sparkSession.experimental
- /**
- * Returns a `DataFrame` with no rows or columns.
- *
- * @group basic
- * @since 1.3.0
- */
- def emptyDataFrame: DataFrame = sparkSession.emptyDataFrame
-
- /**
- * A collection of methods for registering user-defined functions (UDF).
- *
- * The following example registers a Scala closure as UDF:
- * {{{
- * sqlContext.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 +
arg1)
- * }}}
- *
- * The following example registers a UDF in Java:
- * {{{
- * sqlContext.udf().register("myUDF",
- * (Integer arg1, String arg2) -> arg2 + arg1,
- * DataTypes.StringType);
- * }}}
- *
- * @note The user-defined functions must be deterministic. Due to
optimization,
- * duplicate invocations may be eliminated or the function may even be
invoked more times than
- * it is present in the query.
- *
- * @group basic
- * @since 1.3.0
- */
+ /** @inheritdoc */
def udf: UDFRegistration = sparkSession.udf
- /**
- * Returns true if the table is currently cached in-memory.
- * @group cachemgmt
- * @since 1.3.0
- */
- def isCached(tableName: String): Boolean = {
- sparkSession.catalog.isCached(tableName)
- }
-
- /**
- * Caches the specified table in-memory.
- * @group cachemgmt
- * @since 1.3.0
- */
- def cacheTable(tableName: String): Unit = {
- sparkSession.catalog.cacheTable(tableName)
- }
-
- /**
- * Removes the specified table from the in-memory cache.
- * @group cachemgmt
- * @since 1.3.0
- */
- def uncacheTable(tableName: String): Unit = {
- sparkSession.catalog.uncacheTable(tableName)
- }
-
- /**
- * Removes all cached tables from the in-memory cache.
- * @since 1.3.0
- */
- def clearCache(): Unit = {
- sparkSession.catalog.clearCache()
- }
-
// scalastyle:off
// Disable style checker so "implicits" object can start with lowercase i
- /**
- * (Scala-specific) Implicit methods available in Scala for converting
- * common Scala objects into `DataFrame`s.
- *
- * {{{
- * val sqlContext = new SQLContext(sc)
- * import sqlContext.implicits._
- * }}}
- *
- * @group basic
- * @since 1.3.0
- */
+
+ /** @inheritdoc */
object implicits extends SQLImplicits {
+ /** @inheritdoc */
override protected def session: SparkSession = sparkSession
}
+
// scalastyle:on
/**
- * Creates a DataFrame from an RDD of Product (e.g. case classes, tuples).
- *
- * @group dataframes
- * @since 1.3.0
+ * Creates a DataFrame from an RDD[Row]. User can specify whether the input
rows should be
+ * converted to Catalyst rows.
*/
- def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = {
- sparkSession.createDataFrame(rdd)
+ private[sql] def internalCreateDataFrame(
+ catalystRows: RDD[InternalRow],
+ schema: StructType,
+ isStreaming: Boolean = false): DataFrame = {
+ sparkSession.internalCreateDataFrame(catalystRows, schema, isStreaming)
}
- /**
- * Creates a DataFrame from a local Seq of Product.
- *
- * @group dataframes
- * @since 1.3.0
- */
- def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = {
- sparkSession.createDataFrame(data)
- }
+ /** @inheritdoc */
+ def read: DataFrameReader = sparkSession.read
- /**
- * Convert a `BaseRelation` created for external data sources into a
`DataFrame`.
- *
- * @group dataframes
- * @since 1.3.0
- */
- def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
- sparkSession.baseRelationToDataFrame(baseRelation)
- }
+ /** @inheritdoc */
+ def readStream: DataStreamReader = sparkSession.readStream
/**
- * :: DeveloperApi ::
- * Creates a `DataFrame` from an `RDD` containing [[Row]]s using the given
schema.
- * It is important to make sure that the structure of every [[Row]] of the
provided RDD matches
- * the provided schema. Otherwise, there will be runtime exception.
- * Example:
- * {{{
- * import org.apache.spark.sql._
- * import org.apache.spark.sql.types._
- * val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- *
- * val schema =
- * StructType(
- * StructField("name", StringType, false) ::
- * StructField("age", IntegerType, true) :: Nil)
- *
- * val people =
- * sc.textFile("examples/src/main/resources/people.txt").map(
- * _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
- * val dataFrame = sqlContext.createDataFrame(people, schema)
- * dataFrame.printSchema
- * // root
- * // |-- name: string (nullable = false)
- * // |-- age: integer (nullable = true)
- *
- * dataFrame.createOrReplaceTempView("people")
- * sqlContext.sql("select name from people").collect.foreach(println)
- * }}}
- *
- * @group dataframes
- * @since 1.3.0
+ * Registers the given `DataFrame` as a temporary table in the catalog.
Temporary tables exist
+ * only during the lifetime of this instance of SQLContext.
*/
- @DeveloperApi
- def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
- sparkSession.createDataFrame(rowRDD, schema)
+ private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String):
Unit = {
+ df.createOrReplaceTempView(tableName)
}
/**
- * Creates a [[Dataset]] from a local Seq of data of a given type. This
method requires an
- * encoder (to convert a JVM object of type `T` to and from the internal
Spark SQL representation)
- * that is generally created automatically through implicits from a
`SparkSession`, or can be
- * created explicitly by calling static methods on [[Encoders]].
- *
- * == Example ==
- *
- * {{{
- *
- * import spark.implicits._
- * case class Person(name: String, age: Long)
- * val data = Seq(Person("Michael", 29), Person("Andy", 30),
Person("Justin", 19))
- * val ds = spark.createDataset(data)
- *
- * ds.show()
- * // +-------+---+
- * // | name|age|
- * // +-------+---+
- * // |Michael| 29|
- * // | Andy| 30|
- * // | Justin| 19|
- * // +-------+---+
- * }}}
+ * Returns a `StreamingQueryManager` that allows managing all the
+ * [[org.apache.spark.sql.streaming.StreamingQuery StreamingQueries]] active
on `this` context.
*
* @since 2.0.0
- * @group dataset
*/
- def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {
- sparkSession.createDataset(data)
- }
+ def streams: StreamingQueryManager = sparkSession.streams
- /**
- * Creates a [[Dataset]] from an RDD of a given type. This method requires an
- * encoder (to convert a JVM object of type `T` to and from the internal
Spark SQL representation)
- * that is generally created automatically through implicits from a
`SparkSession`, or can be
- * created explicitly by calling static methods on [[Encoders]].
- *
- * @since 2.0.0
- * @group dataset
- */
- def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = {
- sparkSession.createDataset(data)
- }
+ /** @inheritdoc */
+ override def sparkContext: SparkContext = super.sparkContext
- /**
- * Creates a [[Dataset]] from a `java.util.List` of a given type. This
method requires an
- * encoder (to convert a JVM object of type `T` to and from the internal
Spark SQL representation)
- * that is generally created automatically through implicits from a
`SparkSession`, or can be
- * created explicitly by calling static methods on [[Encoders]].
- *
- * == Java Example ==
- *
- * {{{
- * List<String> data = Arrays.asList("hello", "world");
- * Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
- * }}}
- *
- * @since 2.0.0
- * @group dataset
- */
- def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T] = {
- sparkSession.createDataset(data)
- }
+ /** @inheritdoc */
+ override def newSession(): SQLContext = sparkSession.newSession().sqlContext
- /**
- * Creates a DataFrame from an RDD[Row]. User can specify whether the input
rows should be
- * converted to Catalyst rows.
- */
- private[sql]
- def internalCreateDataFrame(
- catalystRows: RDD[InternalRow],
- schema: StructType,
- isStreaming: Boolean = false) = {
- sparkSession.internalCreateDataFrame(catalystRows, schema, isStreaming)
- }
+ /** @inheritdoc */
+ override def emptyDataFrame: Dataset[Row] = super.emptyDataFrame
- /**
- * :: DeveloperApi ::
- * Creates a `DataFrame` from a `JavaRDD` containing [[Row]]s using the
given schema.
- * It is important to make sure that the structure of every [[Row]] of the
provided RDD matches
- * the provided schema. Otherwise, there will be runtime exception.
- *
- * @group dataframes
- * @since 1.3.0
- */
- @DeveloperApi
- def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
- sparkSession.createDataFrame(rowRDD, schema)
- }
+ /** @inheritdoc */
+ override def createDataFrame[A <: Product: TypeTag](rdd: RDD[A]):
Dataset[Row] =
+ super.createDataFrame(rdd)
- /**
- * :: DeveloperApi ::
- * Creates a `DataFrame` from a `java.util.List` containing [[Row]]s using
the given schema.
- * It is important to make sure that the structure of every [[Row]] of the
provided List matches
- * the provided schema. Otherwise, there will be runtime exception.
- *
- * @group dataframes
- * @since 1.6.0
- */
+ /** @inheritdoc */
+ override def createDataFrame[A <: Product: TypeTag](data: Seq[A]):
Dataset[Row] =
+ super.createDataFrame(data)
+
+ /** @inheritdoc */
+ override def baseRelationToDataFrame(baseRelation: BaseRelation):
Dataset[Row] =
+ super.baseRelationToDataFrame(baseRelation)
+
+ /** @inheritdoc */
@DeveloperApi
- def createDataFrame(rows: java.util.List[Row], schema: StructType):
DataFrame = {
- sparkSession.createDataFrame(rows, schema)
- }
+ override def createDataFrame(rowRDD: RDD[Row], schema: StructType):
Dataset[Row] =
+ super.createDataFrame(rowRDD, schema)
- /**
- * Applies a schema to an RDD of Java Beans.
- *
- * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
- * SELECT * queries will return the columns in an undefined order.
- * @group dataframes
- * @since 1.3.0
- */
- def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
- sparkSession.createDataFrame(rdd, beanClass)
- }
+ /** @inheritdoc */
+ override def createDataset[T: Encoder](data: Seq[T]): Dataset[T] =
super.createDataset(data)
- /**
- * Applies a schema to an RDD of Java Beans.
- *
- * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
- * SELECT * queries will return the columns in an undefined order.
- * @group dataframes
- * @since 1.3.0
- */
- def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
- sparkSession.createDataFrame(rdd, beanClass)
- }
+ /** @inheritdoc */
+ override def createDataset[T: Encoder](data: RDD[T]): Dataset[T] =
super.createDataset(data)
- /**
- * Applies a schema to a List of Java Beans.
- *
- * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
- * SELECT * queries will return the columns in an undefined order.
- * @group dataframes
- * @since 1.6.0
- */
- def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame
= {
- sparkSession.createDataFrame(data, beanClass)
- }
+ /** @inheritdoc */
+ override def createDataset[T: Encoder](data: JList[T]): Dataset[T] =
+ super.createDataset(data)
- /**
- * Returns a [[DataFrameReader]] that can be used to read non-streaming data
in as a
- * `DataFrame`.
- * {{{
- * sqlContext.read.parquet("/path/to/file.parquet")
- * sqlContext.read.schema(schema).json("/path/to/file.json")
- * }}}
- *
- * @group genericdata
- * @since 1.4.0
- */
- def read: DataFrameReader = sparkSession.read
+ /** @inheritdoc */
+ @DeveloperApi
+ override def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType):
Dataset[Row] =
+ super.createDataFrame(rowRDD, schema)
+ /** @inheritdoc */
+ @DeveloperApi
+ override def createDataFrame(rows: JList[Row], schema: StructType):
Dataset[Row] =
+ super.createDataFrame(rows, schema)
- /**
- * Returns a `DataStreamReader` that can be used to read streaming data in
as a `DataFrame`.
- * {{{
- * sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
- *
sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
- * }}}
- *
- * @since 2.0.0
- */
- def readStream: DataStreamReader = sparkSession.readStream
+ /** @inheritdoc */
+ override def createDataFrame(rdd: RDD[_], beanClass: Class[_]): Dataset[Row]
=
+ super.createDataFrame(rdd, beanClass)
+ /** @inheritdoc */
+ override def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]):
Dataset[Row] =
+ super.createDataFrame(rdd, beanClass)
- /**
- * Creates an external table from the given path and returns the
corresponding DataFrame.
- * It will use the default data source configured by
spark.sql.sources.default.
- *
- * @group ddl_ops
- * @since 1.3.0
- */
- @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
- def createExternalTable(tableName: String, path: String): DataFrame = {
- sparkSession.catalog.createTable(tableName, path)
- }
+ /** @inheritdoc */
+ override def createDataFrame(data: JList[_], beanClass: Class[_]):
Dataset[Row] =
+ super.createDataFrame(data, beanClass)
- /**
- * Creates an external table from the given path based on a data source
- * and returns the corresponding DataFrame.
- *
- * @group ddl_ops
- * @since 1.3.0
- */
- @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
- def createExternalTable(
+ /** @inheritdoc */
+ override def createExternalTable(tableName: String, path: String):
Dataset[Row] =
+ super.createExternalTable(tableName, path)
+
+ /** @inheritdoc */
+ override def createExternalTable(
tableName: String,
path: String,
- source: String): DataFrame = {
- sparkSession.catalog.createTable(tableName, path, source)
+ source: String): Dataset[Row] = {
+ super.createExternalTable(tableName, path, source)
}
- /**
- * Creates an external table from the given path based on a data source and
a set of options.
- * Then, returns the corresponding DataFrame.
- *
- * @group ddl_ops
- * @since 1.3.0
- */
- @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
- def createExternalTable(
+ /** @inheritdoc */
+ override def createExternalTable(
tableName: String,
source: String,
- options: java.util.Map[String, String]): DataFrame = {
- sparkSession.catalog.createTable(tableName, source, options)
+ options: JMap[String, String]): Dataset[Row] = {
+ super.createExternalTable(tableName, source, options)
}
- /**
- * (Scala-specific)
- * Creates an external table from the given path based on a data source and
a set of options.
- * Then, returns the corresponding DataFrame.
- *
- * @group ddl_ops
- * @since 1.3.0
- */
- @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
- def createExternalTable(
+ /** @inheritdoc */
+ override def createExternalTable(
tableName: String,
source: String,
- options: Map[String, String]): DataFrame = {
- sparkSession.catalog.createTable(tableName, source, options)
+ options: Map[String, String]): Dataset[Row] = {
+ super.createExternalTable(tableName, source, options)
}
- /**
- * Create an external table from the given path based on a data source, a
schema and
- * a set of options. Then, returns the corresponding DataFrame.
- *
- * @group ddl_ops
- * @since 1.3.0
- */
- @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
- def createExternalTable(
+ /** @inheritdoc */
+ override def createExternalTable(
tableName: String,
source: String,
schema: StructType,
- options: java.util.Map[String, String]): DataFrame = {
- sparkSession.catalog.createTable(tableName, source, schema, options)
+ options: JMap[String, String]): Dataset[Row] = {
+ super.createExternalTable(tableName, source, schema, options)
}
- /**
- * (Scala-specific)
- * Create an external table from the given path based on a data source, a
schema and
- * a set of options. Then, returns the corresponding DataFrame.
- *
- * @group ddl_ops
- * @since 1.3.0
- */
- @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
- def createExternalTable(
+ /** @inheritdoc */
+ override def createExternalTable(
tableName: String,
source: String,
schema: StructType,
- options: Map[String, String]): DataFrame = {
- sparkSession.catalog.createTable(tableName, source, schema, options)
- }
-
- /**
- * Registers the given `DataFrame` as a temporary table in the catalog.
Temporary tables exist
- * only during the lifetime of this instance of SQLContext.
- */
- private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String):
Unit = {
- df.createOrReplaceTempView(tableName)
+ options: Map[String, String]): Dataset[Row] = {
+ super.createExternalTable(tableName, source, schema, options)
}
- /**
- * Drops the temporary table with the given table name in the catalog. If
the table has been
- * cached/persisted before, it's also unpersisted.
- *
- * @param tableName the name of the table to be unregistered.
- * @group basic
- * @since 1.3.0
- */
- def dropTempTable(tableName: String): Unit = {
- sparkSession.catalog.dropTempView(tableName)
- }
+ /** @inheritdoc */
+ override def range(end: Long): Dataset[Row] = super.range(end)
- /**
- * Creates a `DataFrame` with a single `LongType` column named `id`,
containing elements
- * in a range from 0 to `end` (exclusive) with step value 1.
- *
- * @since 1.4.1
- * @group dataframe
- */
- def range(end: Long): DataFrame = sparkSession.range(end).toDF()
-
- /**
- * Creates a `DataFrame` with a single `LongType` column named `id`,
containing elements
- * in a range from `start` to `end` (exclusive) with step value 1.
- *
- * @since 1.4.0
- * @group dataframe
- */
- def range(start: Long, end: Long): DataFrame = sparkSession.range(start,
end).toDF()
-
- /**
- * Creates a `DataFrame` with a single `LongType` column named `id`,
containing elements
- * in a range from `start` to `end` (exclusive) with a step value.
- *
- * @since 2.0.0
- * @group dataframe
- */
- def range(start: Long, end: Long, step: Long): DataFrame = {
- sparkSession.range(start, end, step).toDF()
- }
-
- /**
- * Creates a `DataFrame` with a single `LongType` column named `id`,
containing elements
- * in an range from `start` to `end` (exclusive) with an step value, with
partition number
- * specified.
- *
- * @since 1.4.0
- * @group dataframe
- */
- def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame
= {
- sparkSession.range(start, end, step, numPartitions).toDF()
- }
-
- /**
- * Executes a SQL query using Spark, returning the result as a `DataFrame`.
- * This API eagerly runs DDL/DML commands, but not for SELECT queries.
- *
- * @group basic
- * @since 1.3.0
- */
- def sql(sqlText: String): DataFrame = sparkSession.sql(sqlText)
-
- /**
- * Returns the specified table as a `DataFrame`.
- *
- * @group ddl_ops
- * @since 1.3.0
- */
- def table(tableName: String): DataFrame = {
- sparkSession.table(tableName)
- }
+ /** @inheritdoc */
+ override def range(start: Long, end: Long): Dataset[Row] =
super.range(start, end)
- /**
- * Returns a `DataFrame` containing names of existing tables in the current
database.
- * The returned DataFrame has three columns, database, tableName and
isTemporary (a Boolean
- * indicating if a table is a temporary one or not).
- *
- * @group ddl_ops
- * @since 1.3.0
- */
- def tables(): DataFrame = {
- Dataset.ofRows(sparkSession, ShowTables(CurrentNamespace, None))
- }
+ /** @inheritdoc */
+ override def range(start: Long, end: Long, step: Long): Dataset[Row] =
+ super.range(start, end, step)
- /**
- * Returns a `DataFrame` containing names of existing tables in the given
database.
- * The returned DataFrame has three columns, database, tableName and
isTemporary (a Boolean
- * indicating if a table is a temporary one or not).
- *
- * @group ddl_ops
- * @since 1.3.0
- */
- def tables(databaseName: String): DataFrame = {
- Dataset.ofRows(sparkSession,
ShowTables(UnresolvedNamespace(Seq(databaseName)), None))
- }
+ /** @inheritdoc */
+ override def range(start: Long, end: Long, step: Long, numPartitions: Int):
Dataset[Row] =
+ super.range(start, end, step, numPartitions)
- /**
- * Returns a `StreamingQueryManager` that allows managing all the
- * [[org.apache.spark.sql.streaming.StreamingQuery StreamingQueries]] active
on `this` context.
- *
- * @since 2.0.0
- */
- def streams: StreamingQueryManager = sparkSession.streams
+ /** @inheritdoc */
+ override def sql(sqlText: String): Dataset[Row] = super.sql(sqlText)
- /**
- * Returns the names of tables in the current database as an array.
- *
- * @group ddl_ops
- * @since 1.3.0
- */
- def tableNames(): Array[String] = {
- tableNames(sparkSession.catalog.currentDatabase)
- }
+ /** @inheritdoc */
+ override def table(tableName: String): Dataset[Row] = super.table(tableName)
- /**
- * Returns the names of tables in the given database as an array.
- *
- * @group ddl_ops
- * @since 1.3.0
- */
- def tableNames(databaseName: String): Array[String] = {
- sessionState.catalog.listTables(databaseName).map(_.table).toArray
- }
+ /** @inheritdoc */
+ override def tables(): DataFrame = super.tables()
- ////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////
- // Deprecated methods
- ////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////
+ /** @inheritdoc */
+ override def tables(databaseName: String): DataFrame =
super.tables(databaseName)
- /**
- * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
- */
- @deprecated("Use createDataFrame instead.", "1.3.0")
- def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
- createDataFrame(rowRDD, schema)
- }
+ /** @inheritdoc */
+ override def applySchema(rowRDD: RDD[Row], schema: StructType): Dataset[Row]
=
+ super.applySchema(rowRDD, schema)
- /**
- * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
- */
- @deprecated("Use createDataFrame instead.", "1.3.0")
- def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
- createDataFrame(rowRDD, schema)
- }
+ /** @inheritdoc */
+ override def applySchema(rowRDD: JavaRDD[Row], schema: StructType):
Dataset[Row] =
+ super.applySchema(rowRDD, schema)
- /**
- * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
- */
- @deprecated("Use createDataFrame instead.", "1.3.0")
- def applySchema(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
- createDataFrame(rdd, beanClass)
- }
+ /** @inheritdoc */
+ override def applySchema(rdd: RDD[_], beanClass: Class[_]): Dataset[Row] =
+ super.applySchema(rdd, beanClass)
- /**
- * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
- */
- @deprecated("Use createDataFrame instead.", "1.3.0")
- def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
- createDataFrame(rdd, beanClass)
- }
+ /** @inheritdoc */
+ override def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): Dataset[Row]
=
+ super.applySchema(rdd, beanClass)
- /**
- * Loads a Parquet file, returning the result as a `DataFrame`. This
function returns an empty
- * `DataFrame` if no paths are passed in.
- *
- * @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().parquet()`.
- */
- @deprecated("Use read.parquet() instead.", "1.4.0")
+ /** @inheritdoc */
@scala.annotation.varargs
- def parquetFile(paths: String*): DataFrame = {
- if (paths.isEmpty) {
- emptyDataFrame
- } else {
- read.parquet(paths : _*)
- }
- }
+ override def parquetFile(paths: String*): Dataset[Row] =
super.parquetFile(paths: _*)
- /**
- * Loads a JSON file (one object per line), returning the result as a
`DataFrame`.
- * It goes through the entire dataset once to determine the schema.
- *
- * @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().json()`.
- */
- @deprecated("Use read.json() instead.", "1.4.0")
- def jsonFile(path: String): DataFrame = {
- read.json(path)
- }
+ /** @inheritdoc */
+ override def jsonFile(path: String): Dataset[Row] = super.jsonFile(path)
- /**
- * Loads a JSON file (one object per line) and applies the given schema,
- * returning the result as a `DataFrame`.
- *
- * @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().json()`.
- */
- @deprecated("Use read.json() instead.", "1.4.0")
- def jsonFile(path: String, schema: StructType): DataFrame = {
- read.schema(schema).json(path)
- }
+ /** @inheritdoc */
+ override def jsonFile(path: String, schema: StructType): Dataset[Row] =
+ super.jsonFile(path, schema)
- /**
- * @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().json()`.
- */
- @deprecated("Use read.json() instead.", "1.4.0")
- def jsonFile(path: String, samplingRatio: Double): DataFrame = {
- read.option("samplingRatio", samplingRatio.toString).json(path)
- }
+ /** @inheritdoc */
+ override def jsonFile(path: String, samplingRatio: Double): Dataset[Row] =
+ super.jsonFile(path, samplingRatio)
- /**
- * Loads an RDD[String] storing JSON objects (one object per record),
returning the result as a
- * `DataFrame`.
- * It goes through the entire dataset once to determine the schema.
- *
- * @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().json()`.
- */
- @deprecated("Use read.json() instead.", "1.4.0")
- def jsonRDD(json: RDD[String]): DataFrame = read.json(json)
+ /** @inheritdoc */
+ override def jsonRDD(json: RDD[String]): Dataset[Row] = read.json(json)
- /**
- * Loads an RDD[String] storing JSON objects (one object per record),
returning the result as a
- * `DataFrame`.
- * It goes through the entire dataset once to determine the schema.
- *
- * @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().json()`.
- */
- @deprecated("Use read.json() instead.", "1.4.0")
- def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json)
+ /** @inheritdoc */
+ override def jsonRDD(json: JavaRDD[String]): Dataset[Row] = read.json(json)
- /**
- * Loads an RDD[String] storing JSON objects (one object per record) and
applies the given schema,
- * returning the result as a `DataFrame`.
- *
- * @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().json()`.
- */
- @deprecated("Use read.json() instead.", "1.4.0")
- def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
- read.schema(schema).json(json)
- }
+ /** @inheritdoc */
+ override def jsonRDD(json: RDD[String], schema: StructType): Dataset[Row] =
+ super.jsonRDD(json, schema)
- /**
- * Loads an JavaRDD[String] storing JSON objects (one object per record) and
applies the given
- * schema, returning the result as a `DataFrame`.
- *
- * @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().json()`.
- */
- @deprecated("Use read.json() instead.", "1.4.0")
- def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
- read.schema(schema).json(json)
- }
+ /** @inheritdoc */
+ override def jsonRDD(json: JavaRDD[String], schema: StructType):
Dataset[Row] =
+ super.jsonRDD(json, schema)
- /**
- * Loads an RDD[String] storing JSON objects (one object per record)
inferring the
- * schema, returning the result as a `DataFrame`.
- *
- * @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().json()`.
- */
- @deprecated("Use read.json() instead.", "1.4.0")
- def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
- read.option("samplingRatio", samplingRatio.toString).json(json)
- }
+ /** @inheritdoc */
+ override def jsonRDD(json: RDD[String], samplingRatio: Double): Dataset[Row]
=
+ super.jsonRDD(json, samplingRatio)
- /**
- * Loads a JavaRDD[String] storing JSON objects (one object per record)
inferring the
- * schema, returning the result as a `DataFrame`.
- *
- * @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().json()`.
- */
- @deprecated("Use read.json() instead.", "1.4.0")
- def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
- read.option("samplingRatio", samplingRatio.toString).json(json)
- }
+ /** @inheritdoc */
+ override def jsonRDD(json: JavaRDD[String], samplingRatio: Double):
Dataset[Row] =
+ super.jsonRDD(json, samplingRatio)
- /**
- * Returns the dataset stored at path as a DataFrame,
- * using the default data source configured by spark.sql.sources.default.
- *
- * @group genericdata
- * @deprecated As of 1.4.0, replaced by `read().load(path)`.
- */
- @deprecated("Use read.load(path) instead.", "1.4.0")
- def load(path: String): DataFrame = {
- read.load(path)
- }
+ /** @inheritdoc */
+ override def load(path: String): Dataset[Row] = super.load(path)
- /**
- * Returns the dataset stored at path as a DataFrame, using the given data
source.
- *
- * @group genericdata
- * @deprecated As of 1.4.0, replaced by `read().format(source).load(path)`.
- */
- @deprecated("Use read.format(source).load(path) instead.", "1.4.0")
- def load(path: String, source: String): DataFrame = {
- read.format(source).load(path)
- }
+ /** @inheritdoc */
+ override def load(path: String, source: String): Dataset[Row] =
super.load(path, source)
- /**
- * (Java-specific) Returns the dataset specified by the given data source and
- * a set of options as a DataFrame.
- *
- * @group genericdata
- * @deprecated As of 1.4.0, replaced by
`read().format(source).options(options).load()`.
- */
- @deprecated("Use read.format(source).options(options).load() instead.",
"1.4.0")
- def load(source: String, options: java.util.Map[String, String]): DataFrame
= {
- read.options(options).format(source).load()
- }
+ /** @inheritdoc */
+ override def load(source: String, options: JMap[String, String]):
Dataset[Row] =
+ super.load(source, options)
- /**
- * (Scala-specific) Returns the dataset specified by the given data source
and
- * a set of options as a DataFrame.
- *
- * @group genericdata
- * @deprecated As of 1.4.0, replaced by
`read().format(source).options(options).load()`.
- */
- @deprecated("Use read.format(source).options(options).load() instead.",
"1.4.0")
- def load(source: String, options: Map[String, String]): DataFrame = {
- read.options(options).format(source).load()
- }
+ /** @inheritdoc */
+ override def load(source: String, options: Map[String, String]):
Dataset[Row] =
+ super.load(source, options)
- /**
- * (Java-specific) Returns the dataset specified by the given data source and
- * a set of options as a DataFrame, using the given schema as the schema of
the DataFrame.
- *
- * @group genericdata
- * @deprecated As of 1.4.0, replaced by
- * `read().format(source).schema(schema).options(options).load()`.
- */
- @deprecated("Use read.format(source).schema(schema).options(options).load()
instead.", "1.4.0")
- def load(
+ /** @inheritdoc */
+ override def load(
source: String,
schema: StructType,
- options: java.util.Map[String, String]): DataFrame = {
- read.format(source).schema(schema).options(options).load()
+ options: JMap[String, String]): Dataset[Row] = {
+ super.load(source, schema, options)
}
- /**
- * (Scala-specific) Returns the dataset specified by the given data source
and
- * a set of options as a DataFrame, using the given schema as the schema of
the DataFrame.
- *
- * @group genericdata
- * @deprecated As of 1.4.0, replaced by
- * `read().format(source).schema(schema).options(options).load()`.
- */
- @deprecated("Use read.format(source).schema(schema).options(options).load()
instead.", "1.4.0")
- def load(source: String, schema: StructType, options: Map[String, String]):
DataFrame = {
- read.format(source).schema(schema).options(options).load()
+ /** @inheritdoc */
+ override def load(
+ source: String,
+ schema: StructType,
+ options: Map[String, String]): Dataset[Row] = {
+ super.load(source, schema, options)
}
- /**
- * Construct a `DataFrame` representing the database table accessible via
JDBC URL
- * url named table.
- *
- * @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().jdbc()`.
- */
- @deprecated("Use read.jdbc() instead.", "1.4.0")
- def jdbc(url: String, table: String): DataFrame = {
- read.jdbc(url, table, new Properties)
- }
+ /** @inheritdoc */
+ override def jdbc(url: String, table: String): Dataset[Row] =
super.jdbc(url, table)
- /**
- * Construct a `DataFrame` representing the database table accessible via
JDBC URL
- * url named table. Partitions of the table will be retrieved in parallel
based on the parameters
- * passed to this function.
- *
- * @param columnName the name of a column of integral type that will be used
for partitioning.
- * @param lowerBound the minimum value of `columnName` used to decide
partition stride
- * @param upperBound the maximum value of `columnName` used to decide
partition stride
- * @param numPartitions the number of partitions. the range
`minValue`-`maxValue` will be split
- * evenly into this many partitions
- * @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().jdbc()`.
- */
- @deprecated("Use read.jdbc() instead.", "1.4.0")
- def jdbc(
+ /** @inheritdoc */
+ override def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
- numPartitions: Int): DataFrame = {
- read.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions,
new Properties)
+ numPartitions: Int): Dataset[Row] = {
+ super.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions)
}
- /**
- * Construct a `DataFrame` representing the database table accessible via
JDBC URL
- * url named table. The theParts parameter gives a list expressions
- * suitable for inclusion in WHERE clauses; each one defines one partition
- * of the `DataFrame`.
- *
- * @group specificdata
- * @deprecated As of 1.4.0, replaced by `read().jdbc()`.
- */
- @deprecated("Use read.jdbc() instead.", "1.4.0")
- def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = {
- read.jdbc(url, table, theParts, new Properties)
- }
+ /** @inheritdoc */
+ override def jdbc(url: String, table: String, theParts: Array[String]):
Dataset[Row] =
+ super.jdbc(url, table, theParts)
}
-/**
- * This SQLContext object contains utility functions to create a singleton
SQLContext instance,
- * or to get the created SQLContext instance.
- *
- * It also provides utility functions to support preference for threads in
multiple sessions
- * scenario, setActive could set a SQLContext for current thread, which will
be returned by
- * getOrCreate instead of the global one.
- */
-object SQLContext {
+object SQLContext extends api.SQLContextCompanion {
- /**
- * Get the singleton SQLContext if it exists or create a new one using the
given SparkContext.
- *
- * This function can be used to create a singleton SQLContext object that
can be shared across
- * the JVM.
- *
- * If there is an active SQLContext for current thread, it will be returned
instead of the global
- * one.
- *
- * @since 1.5.0
- */
- @deprecated("Use SparkSession.builder instead", "2.0.0")
+ override private[sql] type SQLContextImpl = SQLContext
+ override private[sql] type SparkContextImpl = SparkContext
+
+ /** @inheritdoc */
def getOrCreate(sparkContext: SparkContext): SQLContext = {
SparkSession.builder().sparkContext(sparkContext).getOrCreate().sqlContext
}
- /**
- * Changes the SQLContext that will be returned in this thread and its
children when
- * SQLContext.getOrCreate() is called. This can be used to ensure that a
given thread receives
- * a SQLContext with an isolated session, instead of the global (first
created) context.
- *
- * @since 1.6.0
- */
- @deprecated("Use SparkSession.setActiveSession instead", "2.0.0")
- def setActive(sqlContext: SQLContext): Unit = {
- SparkSession.setActiveSession(sqlContext.sparkSession)
- }
-
- /**
- * Clears the active SQLContext for current thread. Subsequent calls to
getOrCreate will
- * return the first created context instead of a thread-local override.
- *
- * @since 1.6.0
- */
- @deprecated("Use SparkSession.clearActiveSession instead", "2.0.0")
- def clearActive(): Unit = {
- SparkSession.clearActiveSession()
- }
+ /** @inheritdoc */
+ override def setActive(sqlContext: SQLContext): Unit =
super.setActive(sqlContext)
/**
- * Converts an iterator of Java Beans to InternalRow using the provided
- * bean info & schema. This is not related to the singleton, but is a static
- * method for internal use.
+ * Converts an iterator of Java Beans to InternalRow using the provided bean
info & schema. This
+ * is not related to the singleton, but is a static method for internal use.
*/
private[sql] def beansToRows(
data: Iterator[_],
@@ -1058,7 +400,9 @@ object SQLContext {
attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
def createStructConverter(cls: Class[_], fieldTypes: Seq[DataType]): Any
=> InternalRow = {
val methodConverters =
- JavaTypeInference.getJavaBeanReadableProperties(cls).zip(fieldTypes)
+ JavaTypeInference
+ .getJavaBeanReadableProperties(cls)
+ .zip(fieldTypes)
.map { case (property, fieldType) =>
val method = property.getReadMethod
method -> createConverter(method.getReturnType, fieldType)
@@ -1067,16 +411,17 @@ object SQLContext {
if (value == null) {
null
} else {
- new GenericInternalRow(
- methodConverters.map { case (method, converter) =>
- converter(method.invoke(value))
- })
+ new GenericInternalRow(methodConverters.map { case (method,
converter) =>
+ converter(method.invoke(value))
+ })
}
}
+
def createConverter(cls: Class[_], dataType: DataType): Any => Any =
dataType match {
case struct: StructType => createStructConverter(cls,
struct.map(_.dataType))
case _ => CatalystTypeConverters.createToCatalystConverter(dataType)
}
+
val dataConverter = createStructConverter(beanClass, attrs.map(_.dataType))
data.map(dataConverter)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index d81768c0077e..ea0d405d2a8f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -99,6 +99,29 @@ class SQLContextSuite extends SparkFunSuite with
SharedSparkContext {
assert(sqlContext.tables().filter("tableName =
'listtablessuitetable'").count() === 0)
}
+ test("get tables from a database") {
+ val sqlContext = SQLContext.getOrCreate(sc)
+
+ try {
+ sqlContext.sql("CREATE DATABASE IF NOT EXISTS temp_db_1")
+ sqlContext.sql("CREATE TABLE temp_db_1.temp_table_1 (key int)")
+ sqlContext.sql("INSERT INTO temp_db_1.temp_table_1 VALUES (1)")
+
+
assert(sqlContext.tableNames("temp_db_1").sameElements(Array("temp_table_1")))
+
+ assert(sqlContext.tables("temp_db_1").collect().toSeq ==
+ Row("temp_db_1", "temp_table_1", false) :: Nil)
+
+ assert(sqlContext.tables().collect().toSeq == Nil)
+ sqlContext.sql("USE temp_db_1")
+ assert(sqlContext.tableNames().sameElements(Array("temp_table_1")))
+ assert(sqlContext.tables().collect().toSeq == Row("temp_db_1",
"temp_table_1", false) :: Nil)
+ } finally {
+ sqlContext.sql("USE default")
+ sqlContext.sql("DROP DATABASE IF EXISTS temp_db_1 CASCADE")
+ }
+ }
+
test("getting all tables with a database name has no impact on returned
table names") {
val sqlContext = SQLContext.getOrCreate(sc)
val df = sqlContext.range(10)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]