This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ebc358c [SPARK-31086][SQL] Add Back the Deprecated SQLContext methods
ebc358c is described below
commit ebc358c8d2b6d67c7319be006452c9c993b7a098
Author: gatorsmile <[email protected]>
AuthorDate: Thu Mar 26 23:49:24 2020 -0700
[SPARK-31086][SQL] Add Back the Deprecated SQLContext methods
### What changes were proposed in this pull request?
Based on the discussion in the mailing list [[Proposal] Modification to
Spark's Semantic Versioning
Policy](http://apache-spark-developers-list.1001551.n3.nabble.com/Proposal-Modification-to-Spark-s-Semantic-Versioning-Policy-td28938.html)
, this PR is to add back the following APIs whose maintenance cost are
relatively small.
- SQLContext.applySchema
- SQLContext.parquetFile
- SQLContext.jsonFile
- SQLContext.jsonRDD
- SQLContext.load
- SQLContext.jdbc
### Why are the changes needed?
Avoid breaking the APIs that are commonly used.
### Does this PR introduce any user-facing change?
Adding back the APIs that were removed in 3.0 branch does not introduce the
user-facing changes, because Spark 3.0 has not been released.
### How was this patch tested?
The existing tests.
Closes #27839 from gatorsmile/addAPIBackV3.
Lead-authored-by: gatorsmile <[email protected]>
Co-authored-by: yi.wu <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
(cherry picked from commit b7e4cc775b7eac68606d1f385911613f5139db1b)
Signed-off-by: gatorsmile <[email protected]>
---
.../scala/org/apache/spark/sql/SQLContext.scala | 283 +++++++++++++++++++++
.../org/apache/spark/sql/DeprecatedAPISuite.scala | 106 ++++++++
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 14 +
3 files changed, 403 insertions(+)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 2054874..592c64c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -611,6 +611,289 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
sessionState.catalog.listTables(databaseName).map(_.table).toArray
}
+ ////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////
+ // Deprecated methods
+ ////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
+ */
+ @deprecated("Use createDataFrame instead.", "1.3.0")
+ def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
+ createDataFrame(rowRDD, schema)
+ }
+
+ /**
+ * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
+ */
+ @deprecated("Use createDataFrame instead.", "1.3.0")
+ def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
+ createDataFrame(rowRDD, schema)
+ }
+
+ /**
+ * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
+ */
+ @deprecated("Use createDataFrame instead.", "1.3.0")
+ def applySchema(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
+ createDataFrame(rdd, beanClass)
+ }
+
+ /**
+ * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
+ */
+ @deprecated("Use createDataFrame instead.", "1.3.0")
+ def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
+ createDataFrame(rdd, beanClass)
+ }
+
+ /**
+ * Loads a Parquet file, returning the result as a `DataFrame`. This
function returns an empty
+ * `DataFrame` if no paths are passed in.
+ *
+ * @group specificdata
+ * @deprecated As of 1.4.0, replaced by `read().parquet()`.
+ */
+ @deprecated("Use read.parquet() instead.", "1.4.0")
+ @scala.annotation.varargs
+ def parquetFile(paths: String*): DataFrame = {
+ if (paths.isEmpty) {
+ emptyDataFrame
+ } else {
+ read.parquet(paths : _*)
+ }
+ }
+
+ /**
+ * Loads a JSON file (one object per line), returning the result as a
`DataFrame`.
+ * It goes through the entire dataset once to determine the schema.
+ *
+ * @group specificdata
+ * @deprecated As of 1.4.0, replaced by `read().json()`.
+ */
+ @deprecated("Use read.json() instead.", "1.4.0")
+ def jsonFile(path: String): DataFrame = {
+ read.json(path)
+ }
+
+ /**
+ * Loads a JSON file (one object per line) and applies the given schema,
+ * returning the result as a `DataFrame`.
+ *
+ * @group specificdata
+ * @deprecated As of 1.4.0, replaced by `read().json()`.
+ */
+ @deprecated("Use read.json() instead.", "1.4.0")
+ def jsonFile(path: String, schema: StructType): DataFrame = {
+ read.schema(schema).json(path)
+ }
+
+ /**
+ * @group specificdata
+ * @deprecated As of 1.4.0, replaced by `read().json()`.
+ */
+ @deprecated("Use read.json() instead.", "1.4.0")
+ def jsonFile(path: String, samplingRatio: Double): DataFrame = {
+ read.option("samplingRatio", samplingRatio.toString).json(path)
+ }
+
+ /**
+ * Loads an RDD[String] storing JSON objects (one object per record),
returning the result as a
+ * `DataFrame`.
+ * It goes through the entire dataset once to determine the schema.
+ *
+ * @group specificdata
+ * @deprecated As of 1.4.0, replaced by `read().json()`.
+ */
+ @deprecated("Use read.json() instead.", "1.4.0")
+ def jsonRDD(json: RDD[String]): DataFrame = read.json(json)
+
+ /**
+ * Loads an RDD[String] storing JSON objects (one object per record),
returning the result as a
+ * `DataFrame`.
+ * It goes through the entire dataset once to determine the schema.
+ *
+ * @group specificdata
+ * @deprecated As of 1.4.0, replaced by `read().json()`.
+ */
+ @deprecated("Use read.json() instead.", "1.4.0")
+ def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json)
+
+ /**
+ * Loads an RDD[String] storing JSON objects (one object per record) and
applies the given schema,
+ * returning the result as a `DataFrame`.
+ *
+ * @group specificdata
+ * @deprecated As of 1.4.0, replaced by `read().json()`.
+ */
+ @deprecated("Use read.json() instead.", "1.4.0")
+ def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
+ read.schema(schema).json(json)
+ }
+
+ /**
+ * Loads an JavaRDD[String] storing JSON objects (one object per record) and
applies the given
+ * schema, returning the result as a `DataFrame`.
+ *
+ * @group specificdata
+ * @deprecated As of 1.4.0, replaced by `read().json()`.
+ */
+ @deprecated("Use read.json() instead.", "1.4.0")
+ def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
+ read.schema(schema).json(json)
+ }
+
+ /**
+ * Loads an RDD[String] storing JSON objects (one object per record)
inferring the
+ * schema, returning the result as a `DataFrame`.
+ *
+ * @group specificdata
+ * @deprecated As of 1.4.0, replaced by `read().json()`.
+ */
+ @deprecated("Use read.json() instead.", "1.4.0")
+ def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
+ read.option("samplingRatio", samplingRatio.toString).json(json)
+ }
+
+ /**
+ * Loads a JavaRDD[String] storing JSON objects (one object per record)
inferring the
+ * schema, returning the result as a `DataFrame`.
+ *
+ * @group specificdata
+ * @deprecated As of 1.4.0, replaced by `read().json()`.
+ */
+ @deprecated("Use read.json() instead.", "1.4.0")
+ def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
+ read.option("samplingRatio", samplingRatio.toString).json(json)
+ }
+
+ /**
+ * Returns the dataset stored at path as a DataFrame,
+ * using the default data source configured by spark.sql.sources.default.
+ *
+ * @group genericdata
+ * @deprecated As of 1.4.0, replaced by `read().load(path)`.
+ */
+ @deprecated("Use read.load(path) instead.", "1.4.0")
+ def load(path: String): DataFrame = {
+ read.load(path)
+ }
+
+ /**
+ * Returns the dataset stored at path as a DataFrame, using the given data
source.
+ *
+ * @group genericdata
+ * @deprecated As of 1.4.0, replaced by `read().format(source).load(path)`.
+ */
+ @deprecated("Use read.format(source).load(path) instead.", "1.4.0")
+ def load(path: String, source: String): DataFrame = {
+ read.format(source).load(path)
+ }
+
+ /**
+ * (Java-specific) Returns the dataset specified by the given data source and
+ * a set of options as a DataFrame.
+ *
+ * @group genericdata
+ * @deprecated As of 1.4.0, replaced by
`read().format(source).options(options).load()`.
+ */
+ @deprecated("Use read.format(source).options(options).load() instead.",
"1.4.0")
+ def load(source: String, options: java.util.Map[String, String]): DataFrame
= {
+ read.options(options).format(source).load()
+ }
+
+ /**
+ * (Scala-specific) Returns the dataset specified by the given data source
and
+ * a set of options as a DataFrame.
+ *
+ * @group genericdata
+ * @deprecated As of 1.4.0, replaced by
`read().format(source).options(options).load()`.
+ */
+ @deprecated("Use read.format(source).options(options).load() instead.",
"1.4.0")
+ def load(source: String, options: Map[String, String]): DataFrame = {
+ read.options(options).format(source).load()
+ }
+
+ /**
+ * (Java-specific) Returns the dataset specified by the given data source and
+ * a set of options as a DataFrame, using the given schema as the schema of
the DataFrame.
+ *
+ * @group genericdata
+ * @deprecated As of 1.4.0, replaced by
+ * `read().format(source).schema(schema).options(options).load()`.
+ */
+ @deprecated("Use read.format(source).schema(schema).options(options).load()
instead.", "1.4.0")
+ def load(
+ source: String,
+ schema: StructType,
+ options: java.util.Map[String, String]): DataFrame = {
+ read.format(source).schema(schema).options(options).load()
+ }
+
+ /**
+ * (Scala-specific) Returns the dataset specified by the given data source
and
+ * a set of options as a DataFrame, using the given schema as the schema of
the DataFrame.
+ *
+ * @group genericdata
+ * @deprecated As of 1.4.0, replaced by
+ * `read().format(source).schema(schema).options(options).load()`.
+ */
+ @deprecated("Use read.format(source).schema(schema).options(options).load()
instead.", "1.4.0")
+ def load(source: String, schema: StructType, options: Map[String, String]):
DataFrame = {
+ read.format(source).schema(schema).options(options).load()
+ }
+
+ /**
+ * Construct a `DataFrame` representing the database table accessible via
JDBC URL
+ * url named table.
+ *
+ * @group specificdata
+ * @deprecated As of 1.4.0, replaced by `read().jdbc()`.
+ */
+ @deprecated("Use read.jdbc() instead.", "1.4.0")
+ def jdbc(url: String, table: String): DataFrame = {
+ read.jdbc(url, table, new Properties)
+ }
+
+ /**
+ * Construct a `DataFrame` representing the database table accessible via
JDBC URL
+ * url named table. Partitions of the table will be retrieved in parallel
based on the parameters
+ * passed to this function.
+ *
+ * @param columnName the name of a column of integral type that will be used
for partitioning.
+ * @param lowerBound the minimum value of `columnName` used to decide
partition stride
+ * @param upperBound the maximum value of `columnName` used to decide
partition stride
+ * @param numPartitions the number of partitions. the range
`minValue`-`maxValue` will be split
+ * evenly into this many partitions
+ * @group specificdata
+ * @deprecated As of 1.4.0, replaced by `read().jdbc()`.
+ */
+ @deprecated("Use read.jdbc() instead.", "1.4.0")
+ def jdbc(
+ url: String,
+ table: String,
+ columnName: String,
+ lowerBound: Long,
+ upperBound: Long,
+ numPartitions: Int): DataFrame = {
+ read.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions,
new Properties)
+ }
+
+ /**
+ * Construct a `DataFrame` representing the database table accessible via
JDBC URL
+ * url named table. The theParts parameter gives a list expressions
+ * suitable for inclusion in WHERE clauses; each one defines one partition
+ * of the `DataFrame`.
+ *
+ * @group specificdata
+ * @deprecated As of 1.4.0, replaced by `read().jdbc()`.
+ */
+ @deprecated("Use read.jdbc() instead.", "1.4.0")
+ def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = {
+ read.jdbc(url, table, theParts, new Properties)
+ }
}
/**
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala
new file mode 100644
index 0000000..c31ef99
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField,
StructType}
+
+class DeprecatedAPISuite extends QueryTest with SharedSparkSession {
+
+ test("SQLContext.applySchema") {
+ val rowRdd = sparkContext.parallelize(Seq(Row("Jack", 20), Row("Marry",
18)))
+ val schema = StructType(StructField("name", StringType, false) ::
+ StructField("age", IntegerType, true) :: Nil)
+ val sqlContext = spark.sqlContext
+ checkAnswer(sqlContext.applySchema(rowRdd, schema), Row("Jack", 20) ::
Row("Marry", 18) :: Nil)
+ checkAnswer(sqlContext.applySchema(rowRdd.toJavaRDD(), schema),
+ Row("Jack", 20) :: Row("Marry", 18) :: Nil)
+ }
+
+ test("SQLContext.parquetFile") {
+ val sqlContext = spark.sqlContext
+ withTempDir { dir =>
+ val parquetFile = s"${dir.toString}/${System.currentTimeMillis()}"
+ val expectDF = spark.range(10).toDF()
+ expectDF.write.parquet(parquetFile)
+ val parquetDF = sqlContext.parquetFile(parquetFile)
+ checkAnswer(parquetDF, expectDF)
+ }
+ }
+
+ test("SQLContext.jsonFile") {
+ val sqlContext = spark.sqlContext
+ withTempDir { dir =>
+ val jsonFile = s"${dir.toString}/${System.currentTimeMillis()}"
+ val expectDF = spark.range(10).toDF()
+ expectDF.write.json(jsonFile)
+ var jsonDF = sqlContext.jsonFile(jsonFile)
+ checkAnswer(jsonDF, expectDF)
+ assert(jsonDF.schema === expectDF.schema.asNullable)
+
+ var schema = expectDF.schema
+ jsonDF = sqlContext.jsonFile(jsonFile, schema)
+ checkAnswer(jsonDF, expectDF)
+ assert(jsonDF.schema === schema.asNullable)
+
+ jsonDF = sqlContext.jsonFile(jsonFile, 0.9)
+ checkAnswer(jsonDF, expectDF)
+
+ val jsonRDD =
sparkContext.parallelize(Seq("{\"name\":\"Jack\",\"age\":20}",
+ "{\"name\":\"Marry\",\"age\":18}"))
+ jsonDF = sqlContext.jsonRDD(jsonRDD)
+ checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil)
+ jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD())
+ checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil)
+
+ schema = StructType(StructField("name", StringType, false) ::
+ StructField("age", IntegerType, false) :: Nil)
+ jsonDF = sqlContext.jsonRDD(jsonRDD, schema)
+ checkAnswer(jsonDF, Row("Jack", 20) :: Row("Marry", 18) :: Nil)
+ jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD(), schema)
+ checkAnswer(jsonDF, Row("Jack", 20) :: Row("Marry", 18) :: Nil)
+
+
+ jsonDF = sqlContext.jsonRDD(jsonRDD, 0.9)
+ checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil)
+ jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD(), 0.9)
+ checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil)
+ }
+ }
+
+ test("SQLContext.load") {
+ withTempDir { dir =>
+ val path = s"${dir.toString}/${System.currentTimeMillis()}"
+ val expectDF = spark.range(10).toDF()
+ expectDF.write.parquet(path)
+ val sqlContext = spark.sqlContext
+
+ var loadDF = sqlContext.load(path)
+ checkAnswer(loadDF, expectDF)
+
+ loadDF = sqlContext.load(path, "parquet")
+ checkAnswer(loadDF, expectDF)
+
+ loadDF = sqlContext.load("parquet", Map("path" -> path))
+ checkAnswer(loadDF, expectDF)
+
+ loadDF = sqlContext.load("parquet", expectDF.schema, Map("path" -> path))
+ checkAnswer(loadDF, expectDF)
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 9cba95f..fd691f5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1699,4 +1699,18 @@ class JDBCSuite extends QueryTest
assert(JdbcDialects.get("jdbc:teradata://localhost/db") ===
TeradataDialect)
assert(JdbcDialects.get("jdbc:Teradata://localhost/db") ===
TeradataDialect)
}
+
+ test("SQLContext.jdbc (deprecated)") {
+ val sqlContext = spark.sqlContext
+ var jdbcDF = sqlContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE")
+ checkAnswer(jdbcDF, Row("fred", 1) :: Row("mary", 2) :: Row ("joe 'foo'
\"bar\"", 3) :: Nil)
+
+ jdbcDF = sqlContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4,
3)
+ checkNumPartitions(jdbcDF, 3)
+ checkAnswer(jdbcDF, Row("fred", 1) :: Row("mary", 2) :: Row ("joe 'foo'
\"bar\"", 3) :: Nil)
+
+ val parts = Array[String]("THEID = 2")
+ jdbcDF = sqlContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts)
+ checkAnswer(jdbcDF, Row("mary", 2) :: Nil)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]