This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 2a449df [SPARK-31088][SQL] Add back HiveContext and
createExternalTable
2a449df is described below
commit 2a449df305d5f8495959fd71d937e0f5f4fff87d
Author: gatorsmile <[email protected]>
AuthorDate: Thu Mar 26 23:51:15 2020 -0700
[SPARK-31088][SQL] Add back HiveContext and createExternalTable
### What changes were proposed in this pull request?
Based on the discussion in the mailing list [[Proposal] Modification to
Spark's Semantic Versioning
Policy](http://apache-spark-developers-list.1001551.n3.nabble.com/Proposal-Modification-to-Spark-s-Semantic-Versioning-Policy-td28938.html)
, this PR is to add back the following APIs whose maintenance cost are
relatively small.
- HiveContext
- createExternalTable APIs
### Why are the changes needed?
Avoid breaking the APIs that are commonly used.
### Does this PR introduce any user-facing change?
Adding back the APIs that were removed in 3.0 branch does not introduce the
user-facing changes, because Spark 3.0 has not been released.
### How was this patch tested?
add a new test suite for createExternalTable APIs.
Closes #27815 from gatorsmile/addAPIsBack.
Lead-authored-by: gatorsmile <[email protected]>
Co-authored-by: yi.wu <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
(cherry picked from commit b9eafcb52658b7f5ec60bb4ebcc9da0fde94e105)
Signed-off-by: gatorsmile <[email protected]>
---
docs/sql-migration-guide.md | 4 -
project/MimaExcludes.scala | 2 -
python/pyspark/__init__.py | 2 +-
python/pyspark/sql/__init__.py | 4 +-
python/pyspark/sql/catalog.py | 20 ++++
python/pyspark/sql/context.py | 67 +++++++++++++-
.../scala/org/apache/spark/sql/SQLContext.scala | 91 ++++++++++++++++++
.../org/apache/spark/sql/catalog/Catalog.scala | 102 +++++++++++++++++++-
.../DeprecatedCreateExternalTableSuite.scala | 85 +++++++++++++++++
.../org/apache/spark/sql/hive/HiveContext.scala | 63 +++++++++++++
.../sql/hive/HiveContextCompatibilitySuite.scala | 103 +++++++++++++++++++++
11 files changed, 532 insertions(+), 11 deletions(-)
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index d2773d8..ab35e1f 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -309,10 +309,6 @@ license: |
### Others
- - In Spark 3.0, the deprecated methods `SQLContext.createExternalTable` and
`SparkSession.createExternalTable` have been removed in favor of its
replacement, `createTable`.
-
- - In Spark 3.0, the deprecated `HiveContext` class has been removed. Use
`SparkSession.builder.enableHiveSupport()` instead.
-
- In Spark version 2.4, when a spark session is created via
`cloneSession()`, the newly created spark session inherits its configuration
from its parent `SparkContext` even though the same configuration may exist
with a different value in its parent spark session. Since Spark 3.0, the
configurations of a parent `SparkSession` have a higher precedence over the
parent `SparkContext`. The old behavior can be restored by setting
`spark.sql.legacy.sessionInitWithConfigDefaults` to `true`.
- Since Spark 3.0, if `hive.default.fileformat` is not found in `Spark SQL
configuration` then it will fallback to hive-site.xml present in the `Hadoop
configuration` of `SparkContext`.
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index f8ad60b..9a5029e 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -48,8 +48,6 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ExecutorPlugin"),
// [SPARK-28980][SQL][CORE][MLLIB] Remove more old deprecated items in
Spark 3
-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.createExternalTable"),
-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.createExternalTable"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.KMeans.train"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.mllib.clustering.KMeans.train"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.classification.LogisticRegressionWithSGD$"),
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 76a5bd0..70c0b27 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -113,7 +113,7 @@ def keyword_only(func):
# for back compatibility
-from pyspark.sql import SQLContext, Row
+from pyspark.sql import SQLContext, HiveContext, Row
__all__ = [
"SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel",
"Broadcast",
diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py
index 0a8d71c..c28cb8c 100644
--- a/python/pyspark/sql/__init__.py
+++ b/python/pyspark/sql/__init__.py
@@ -43,7 +43,7 @@ from __future__ import absolute_import
from pyspark.sql.types import Row
-from pyspark.sql.context import SQLContext, UDFRegistration
+from pyspark.sql.context import SQLContext, HiveContext, UDFRegistration
from pyspark.sql.session import SparkSession
from pyspark.sql.column import Column
from pyspark.sql.catalog import Catalog
@@ -55,7 +55,7 @@ from pyspark.sql.pandas.group_ops import PandasCogroupedOps
__all__ = [
- 'SparkSession', 'SQLContext', 'UDFRegistration',
+ 'SparkSession', 'SQLContext', 'HiveContext', 'UDFRegistration',
'DataFrame', 'GroupedData', 'Column', 'Catalog', 'Row',
'DataFrameNaFunctions', 'DataFrameStatFunctions', 'Window', 'WindowSpec',
'DataFrameReader', 'DataFrameWriter', 'PandasCogroupedOps'
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index 08cf6ee..974251f 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -138,6 +138,26 @@ class Catalog(object):
isBucket=jcolumn.isBucket()))
return columns
+ @since(2.0)
+ def createExternalTable(self, tableName, path=None, source=None,
schema=None, **options):
+ """Creates a table based on the dataset in a data source.
+
+ It returns the DataFrame associated with the external table.
+
+ The data source is specified by the ``source`` and a set of
``options``.
+ If ``source`` is not specified, the default data source configured by
+ ``spark.sql.sources.default`` will be used.
+
+ Optionally, a schema can be provided as the schema of the returned
:class:`DataFrame` and
+ created external table.
+
+ :return: :class:`DataFrame`
+ """
+ warnings.warn(
+ "createExternalTable is deprecated since Spark 2.2, please use
createTable instead.",
+ DeprecationWarning)
+ return self.createTable(tableName, path, source, schema, **options)
+
@since(2.2)
def createTable(self, tableName, path=None, source=None, schema=None,
**options):
"""Creates a table based on the dataset in a data source.
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index f203e1c..0b7a7da 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -32,7 +32,7 @@ from pyspark.sql.types import IntegerType, Row, StringType
from pyspark.sql.udf import UDFRegistration
from pyspark.sql.utils import install_exception_handler
-__all__ = ["SQLContext"]
+__all__ = ["SQLContext", "HiveContext"]
class SQLContext(object):
@@ -338,6 +338,24 @@ class SQLContext(object):
"""
self.sparkSession.catalog.dropTempView(tableName)
+ @since(1.3)
+ def createExternalTable(self, tableName, path=None, source=None,
schema=None, **options):
+ """Creates an external table based on the dataset in a data source.
+
+ It returns the DataFrame associated with the external table.
+
+ The data source is specified by the ``source`` and a set of
``options``.
+ If ``source`` is not specified, the default data source configured by
+ ``spark.sql.sources.default`` will be used.
+
+ Optionally, a schema can be provided as the schema of the returned
:class:`DataFrame` and
+ created external table.
+
+ :return: :class:`DataFrame`
+ """
+ return self.sparkSession.catalog.createExternalTable(
+ tableName, path, source, schema, **options)
+
@ignore_unicode_prefix
@since(1.0)
def sql(self, sqlQuery):
@@ -461,6 +479,53 @@ class SQLContext(object):
return StreamingQueryManager(self._ssql_ctx.streams())
+class HiveContext(SQLContext):
+ """A variant of Spark SQL that integrates with data stored in Hive.
+
+ Configuration for Hive is read from ``hive-site.xml`` on the classpath.
+ It supports running both SQL and HiveQL commands.
+
+ :param sparkContext: The SparkContext to wrap.
+ :param jhiveContext: An optional JVM Scala HiveContext. If set, we do not
instantiate a new
+ :class:`HiveContext` in the JVM, instead we make all calls to this
object.
+
+ .. note:: Deprecated in 2.0.0. Use
SparkSession.builder.enableHiveSupport().getOrCreate().
+ """
+
+ def __init__(self, sparkContext, jhiveContext=None):
+ warnings.warn(
+ "HiveContext is deprecated in Spark 2.0.0. Please use " +
+ "SparkSession.builder.enableHiveSupport().getOrCreate() instead.",
+ DeprecationWarning)
+ if jhiveContext is None:
+ sparkContext._conf.set("spark.sql.catalogImplementation", "hive")
+ sparkSession =
SparkSession.builder._sparkContext(sparkContext).getOrCreate()
+ else:
+ sparkSession = SparkSession(sparkContext,
jhiveContext.sparkSession())
+ SQLContext.__init__(self, sparkContext, sparkSession, jhiveContext)
+
+ @classmethod
+ def _createForTesting(cls, sparkContext):
+ """(Internal use only) Create a new HiveContext for testing.
+
+ All test code that touches HiveContext *must* go through this method.
Otherwise,
+ you may end up launching multiple derby instances and encounter with
incredibly
+ confusing error messages.
+ """
+ jsc = sparkContext._jsc.sc()
+ jtestHive =
sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc, False)
+ return cls(sparkContext, jtestHive)
+
+ def refreshTable(self, tableName):
+ """Invalidate and refresh all the cached the metadata of the given
+ table. For performance reasons, Spark SQL or the external data source
+ library it uses might cache certain metadata about a table, such as the
+ location of blocks. When those change outside of Spark SQL, users
should
+ call this function to invalidate the cache.
+ """
+ self._ssql_ctx.refreshTable(tableName)
+
+
def _test():
import os
import doctest
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 592c64c..bbcc842 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -480,6 +480,97 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
/**
+ * Creates an external table from the given path and returns the
corresponding DataFrame.
+ * It will use the default data source configured by
spark.sql.sources.default.
+ *
+ * @group ddl_ops
+ * @since 1.3.0
+ */
+ @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
+ def createExternalTable(tableName: String, path: String): DataFrame = {
+ sparkSession.catalog.createTable(tableName, path)
+ }
+
+ /**
+ * Creates an external table from the given path based on a data source
+ * and returns the corresponding DataFrame.
+ *
+ * @group ddl_ops
+ * @since 1.3.0
+ */
+ @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
+ def createExternalTable(
+ tableName: String,
+ path: String,
+ source: String): DataFrame = {
+ sparkSession.catalog.createTable(tableName, path, source)
+ }
+
+ /**
+ * Creates an external table from the given path based on a data source and
a set of options.
+ * Then, returns the corresponding DataFrame.
+ *
+ * @group ddl_ops
+ * @since 1.3.0
+ */
+ @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
+ def createExternalTable(
+ tableName: String,
+ source: String,
+ options: java.util.Map[String, String]): DataFrame = {
+ sparkSession.catalog.createTable(tableName, source, options)
+ }
+
+ /**
+ * (Scala-specific)
+ * Creates an external table from the given path based on a data source and
a set of options.
+ * Then, returns the corresponding DataFrame.
+ *
+ * @group ddl_ops
+ * @since 1.3.0
+ */
+ @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
+ def createExternalTable(
+ tableName: String,
+ source: String,
+ options: Map[String, String]): DataFrame = {
+ sparkSession.catalog.createTable(tableName, source, options)
+ }
+
+ /**
+ * Create an external table from the given path based on a data source, a
schema and
+ * a set of options. Then, returns the corresponding DataFrame.
+ *
+ * @group ddl_ops
+ * @since 1.3.0
+ */
+ @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
+ def createExternalTable(
+ tableName: String,
+ source: String,
+ schema: StructType,
+ options: java.util.Map[String, String]): DataFrame = {
+ sparkSession.catalog.createTable(tableName, source, schema, options)
+ }
+
+ /**
+ * (Scala-specific)
+ * Create an external table from the given path based on a data source, a
schema and
+ * a set of options. Then, returns the corresponding DataFrame.
+ *
+ * @group ddl_ops
+ * @since 1.3.0
+ */
+ @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
+ def createExternalTable(
+ tableName: String,
+ source: String,
+ schema: StructType,
+ options: Map[String, String]): DataFrame = {
+ sparkSession.catalog.createTable(tableName, source, schema, options)
+ }
+
+ /**
* Registers the given `DataFrame` as a temporary table in the catalog.
Temporary tables exist
* only during the lifetime of this instance of SQLContext.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 318cc62..60738e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalog
import scala.collection.JavaConverters._
-import org.apache.spark.annotation.Stable
+import org.apache.spark.annotation.{Evolving, Experimental, Stable}
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.StorageLevel
@@ -215,6 +215,20 @@ abstract class Catalog {
* @param tableName is either a qualified or unqualified name that
designates a table.
* If no database identifier is provided, it refers to a
table in
* the current database.
+ * @since 2.0.0
+ */
+ @deprecated("use createTable instead.", "2.2.0")
+ def createExternalTable(tableName: String, path: String): DataFrame = {
+ createTable(tableName, path)
+ }
+
+ /**
+ * Creates a table from the given path and returns the corresponding
DataFrame.
+ * It will use the default data source configured by
spark.sql.sources.default.
+ *
+ * @param tableName is either a qualified or unqualified name that
designates a table.
+ * If no database identifier is provided, it refers to a
table in
+ * the current database.
* @since 2.2.0
*/
def createTable(tableName: String, path: String): DataFrame
@@ -226,11 +240,42 @@ abstract class Catalog {
* @param tableName is either a qualified or unqualified name that
designates a table.
* If no database identifier is provided, it refers to a
table in
* the current database.
+ * @since 2.0.0
+ */
+ @deprecated("use createTable instead.", "2.2.0")
+ def createExternalTable(tableName: String, path: String, source: String):
DataFrame = {
+ createTable(tableName, path, source)
+ }
+
+ /**
+ * Creates a table from the given path based on a data source and returns
the corresponding
+ * DataFrame.
+ *
+ * @param tableName is either a qualified or unqualified name that
designates a table.
+ * If no database identifier is provided, it refers to a
table in
+ * the current database.
* @since 2.2.0
*/
def createTable(tableName: String, path: String, source: String): DataFrame
/**
+ * Creates a table from the given path based on a data source and a set of
options.
+ * Then, returns the corresponding DataFrame.
+ *
+ * @param tableName is either a qualified or unqualified name that
designates a table.
+ * If no database identifier is provided, it refers to a
table in
+ * the current database.
+ * @since 2.0.0
+ */
+ @deprecated("use createTable instead.", "2.2.0")
+ def createExternalTable(
+ tableName: String,
+ source: String,
+ options: java.util.Map[String, String]): DataFrame = {
+ createTable(tableName, source, options)
+ }
+
+ /**
* Creates a table based on the dataset in a data source and a set of
options.
* Then, returns the corresponding DataFrame.
*
@@ -248,6 +293,24 @@ abstract class Catalog {
/**
* (Scala-specific)
+ * Creates a table from the given path based on a data source and a set of
options.
+ * Then, returns the corresponding DataFrame.
+ *
+ * @param tableName is either a qualified or unqualified name that
designates a table.
+ * If no database identifier is provided, it refers to a
table in
+ * the current database.
+ * @since 2.0.0
+ */
+ @deprecated("use createTable instead.", "2.2.0")
+ def createExternalTable(
+ tableName: String,
+ source: String,
+ options: Map[String, String]): DataFrame = {
+ createTable(tableName, source, options)
+ }
+
+ /**
+ * (Scala-specific)
* Creates a table based on the dataset in a data source and a set of
options.
* Then, returns the corresponding DataFrame.
*
@@ -262,6 +325,24 @@ abstract class Catalog {
options: Map[String, String]): DataFrame
/**
+ * Create a table from the given path based on a data source, a schema and a
set of options.
+ * Then, returns the corresponding DataFrame.
+ *
+ * @param tableName is either a qualified or unqualified name that
designates a table.
+ * If no database identifier is provided, it refers to a
table in
+ * the current database.
+ * @since 2.0.0
+ */
+ @deprecated("use createTable instead.", "2.2.0")
+ def createExternalTable(
+ tableName: String,
+ source: String,
+ schema: StructType,
+ options: java.util.Map[String, String]): DataFrame = {
+ createTable(tableName, source, schema, options)
+ }
+
+ /**
* Create a table based on the dataset in a data source, a schema and a set
of options.
* Then, returns the corresponding DataFrame.
*
@@ -280,6 +361,25 @@ abstract class Catalog {
/**
* (Scala-specific)
+ * Create a table from the given path based on a data source, a schema and a
set of options.
+ * Then, returns the corresponding DataFrame.
+ *
+ * @param tableName is either a qualified or unqualified name that
designates a table.
+ * If no database identifier is provided, it refers to a
table in
+ * the current database.
+ * @since 2.0.0
+ */
+ @deprecated("use createTable instead.", "2.2.0")
+ def createExternalTable(
+ tableName: String,
+ source: String,
+ schema: StructType,
+ options: Map[String, String]): DataFrame = {
+ createTable(tableName, source, schema, options)
+ }
+
+ /**
+ * (Scala-specific)
* Create a table based on the dataset in a data source, a schema and a set
of options.
* Then, returns the corresponding DataFrame.
*
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/internal/DeprecatedCreateExternalTableSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/internal/DeprecatedCreateExternalTableSuite.scala
new file mode 100644
index 0000000..0b5cd3d
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/internal/DeprecatedCreateExternalTableSuite.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import java.io.File
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.StructType
+
+class DeprecatedCreateExternalTableSuite extends SharedSparkSession {
+ test("createExternalTable with explicit path") {
+ withTable("t") {
+ withTempDir { dir =>
+ val path = new File(dir, "test")
+ spark.range(100).write.parquet(path.getAbsolutePath)
+ spark.catalog.createExternalTable(
+ tableName = "t",
+ path = path.getAbsolutePath
+ )
+ assert(spark.sessionState.catalog.tableExists(TableIdentifier("t")))
+ val table =
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.tableType === CatalogTableType.EXTERNAL)
+ assert(table.provider === Some("parquet"))
+ assert(table.schema === new StructType().add("id", "long"))
+ assert(table.storage.locationUri.get ==
makeQualifiedPath(path.getAbsolutePath))
+ }
+ }
+ }
+
+ test("createExternalTable with 'path' options") {
+ withTable("t") {
+ withTempDir { dir =>
+ val path = new File(dir, "test")
+ spark.range(100).write.parquet(path.getAbsolutePath)
+ spark.catalog.createExternalTable(
+ tableName = "t",
+ source = "parquet",
+ options = Map("path" -> path.getAbsolutePath))
+ assert(spark.sessionState.catalog.tableExists(TableIdentifier("t")))
+ val table =
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.tableType === CatalogTableType.EXTERNAL)
+ assert(table.provider === Some("parquet"))
+ assert(table.schema === new StructType().add("id", "long"))
+ assert(table.storage.locationUri.get ==
makeQualifiedPath(path.getAbsolutePath))
+ }
+ }
+ }
+
+ test("createExternalTable with explicit schema") {
+ withTable("t") {
+ withTempDir { dir =>
+ val path = new File(dir, "test")
+ spark.range(100).write.parquet(path.getAbsolutePath)
+ spark.catalog.createExternalTable(
+ tableName = "t",
+ source = "parquet",
+ schema = new StructType().add("i", "int"),
+ options = Map("path" -> path.getAbsolutePath))
+ assert(spark.sessionState.catalog.tableExists(TableIdentifier("t")))
+ val table =
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.tableType === CatalogTableType.EXTERNAL)
+ assert(table.provider === Some("parquet"))
+ assert(table.schema === new StructType().add("i", "int"))
+ assert(table.storage.locationUri.get ==
makeQualifiedPath(path.getAbsolutePath))
+ }
+ }
+ }
+}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
new file mode 100644
index 0000000..02a5117
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.SparkContext
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{SparkSession, SQLContext}
+
+
+/**
+ * An instance of the Spark SQL execution engine that integrates with data
stored in Hive.
+ * Configuration for Hive is read from hive-site.xml on the classpath.
+ */
+@deprecated("Use SparkSession.builder.enableHiveSupport instead", "2.0.0")
+class HiveContext private[hive](_sparkSession: SparkSession)
+ extends SQLContext(_sparkSession) with Logging {
+
+ self =>
+
+ def this(sc: SparkContext) = {
+
this(SparkSession.builder().sparkContext(HiveUtils.withHiveExternalCatalog(sc)).getOrCreate())
+ }
+
+ def this(sc: JavaSparkContext) = this(sc.sc)
+
+ /**
+ * Returns a new HiveContext as new session, which will have separated
SQLConf, UDF/UDAF,
+ * temporary tables and SessionState, but sharing the same CacheManager,
IsolatedClientLoader
+ * and Hive client (both of execution and metadata) with existing
HiveContext.
+ */
+ override def newSession(): HiveContext = {
+ new HiveContext(sparkSession.newSession())
+ }
+
+ /**
+ * Invalidate and refresh all the cached the metadata of the given table.
For performance reasons,
+ * Spark SQL or the external data source library it uses might cache certain
metadata about a
+ * table, such as the location of blocks. When those change outside of Spark
SQL, users should
+ * call this function to invalidate the cache.
+ *
+ * @since 1.3.0
+ */
+ def refreshTable(tableName: String): Unit = {
+ sparkSession.catalog.refreshTable(tableName)
+ }
+
+}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
new file mode 100644
index 0000000..a80db76
--- /dev/null
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+
+
+class HiveContextCompatibilitySuite extends SparkFunSuite with
BeforeAndAfterEach {
+
+ override protected val enableAutoThreadAudit = false
+ private var sc: SparkContext = null
+ private var hc: HiveContext = null
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ sc = SparkContext.getOrCreate(new
SparkConf().setMaster("local").setAppName("test"))
+ HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true).foreach {
case (k, v) =>
+ sc.hadoopConfiguration.set(k, v)
+ }
+ hc = new HiveContext(sc)
+ }
+
+ override def afterEach(): Unit = {
+ try {
+ hc.sharedState.cacheManager.clearCache()
+ hc.sessionState.catalog.reset()
+ } finally {
+ super.afterEach()
+ }
+ }
+
+ override def afterAll(): Unit = {
+ try {
+ sc = null
+ hc = null
+ } finally {
+ super.afterAll()
+ }
+ }
+
+ test("basic operations") {
+ val _hc = hc
+ import _hc.implicits._
+ val df1 = (1 to 20).map { i => (i, i) }.toDF("a", "x")
+ val df2 = (1 to 100).map { i => (i, i % 10, i % 2 == 0) }.toDF("a", "b",
"c")
+ .select($"a", $"b")
+ .filter($"a" > 10 && $"b" > 6 && $"c")
+ val df3 = df1.join(df2, "a")
+ val res = df3.collect()
+ val expected = Seq((18, 18, 8)).toDF("a", "x", "b").collect()
+ assert(res.toSeq == expected.toSeq)
+ df3.createOrReplaceTempView("mai_table")
+ val df4 = hc.table("mai_table")
+ val res2 = df4.collect()
+ assert(res2.toSeq == expected.toSeq)
+ }
+
+ test("basic DDLs") {
+ val _hc = hc
+ import _hc.implicits._
+ val databases = hc.sql("SHOW DATABASES").collect().map(_.getString(0))
+ assert(databases.toSeq == Seq("default"))
+ hc.sql("CREATE DATABASE mee_db")
+ hc.sql("USE mee_db")
+ val databases2 = hc.sql("SHOW DATABASES").collect().map(_.getString(0))
+ assert(databases2.toSet == Set("default", "mee_db"))
+ val df = (1 to 10).map { i => ("bob" + i.toString, i) }.toDF("name", "age")
+ df.createOrReplaceTempView("mee_table")
+ hc.sql("CREATE TABLE moo_table (name string, age int)")
+ hc.sql("INSERT INTO moo_table SELECT * FROM mee_table")
+ assert(
+ hc.sql("SELECT * FROM moo_table order by name").collect().toSeq ==
+ df.collect().toSeq.sortBy(_.getString(0)))
+ val tables = hc.sql("SHOW TABLES IN
mee_db").select("tableName").collect().map(_.getString(0))
+ assert(tables.toSet == Set("moo_table", "mee_table"))
+ hc.sql("DROP TABLE moo_table")
+ hc.sql("DROP TABLE mee_table")
+ val tables2 = hc.sql("SHOW TABLES IN
mee_db").select("tableName").collect().map(_.getString(0))
+ assert(tables2.isEmpty)
+ hc.sql("USE default")
+ hc.sql("DROP DATABASE mee_db CASCADE")
+ val databases3 = hc.sql("SHOW DATABASES").collect().map(_.getString(0))
+ assert(databases3.toSeq == Seq("default"))
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]