Repository: spark Updated Branches: refs/heads/branch-2.0 c7e013f18 -> c0cc921a5
[SPARK-15424][SPARK-15437][SPARK-14807][SQL] Revert Create a hivecontext-compatibility module ## What changes were proposed in this pull request? I initially asked to create a hivecontext-compatibility module to put the HiveContext there. But we are so close to Spark 2.0 release and there is only a single class in it. It seems overkill to have an entire package, which makes it more inconvenient, for a single class. ## How was this patch tested? Tests were moved. Author: Reynold Xin <[email protected]> Closes #13207 from rxin/SPARK-15424. (cherry picked from commit 45b7557e61d440612d4ce49c31b5ef242fdefa54) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0cc921a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0cc921a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0cc921a Branch: refs/heads/branch-2.0 Commit: c0cc921a577c90f058ef84367ae2a92ecae8677a Parents: c7e013f Author: Reynold Xin <[email protected]> Authored: Fri May 20 22:01:55 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Fri May 20 22:02:07 2016 -0700 ---------------------------------------------------------------------- dev/run-tests.py | 2 +- dev/sparktestsupport/modules.py | 12 --- pom.xml | 1 - project/SparkBuild.scala | 6 +- .../org/apache/spark/sql/hive/HiveContext.scala | 73 +++++++++++++ .../hive/HiveContextCompatibilitySuite.scala | 101 ++++++++++++++++++ sql/hivecontext-compatibility/pom.xml | 57 ----------- .../org/apache/spark/sql/hive/HiveContext.scala | 73 ------------- .../hive/HiveContextCompatibilitySuite.scala | 102 ------------------- 9 files changed, 178 insertions(+), 249 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c0cc921a/dev/run-tests.py ---------------------------------------------------------------------- diff --git a/dev/run-tests.py b/dev/run-tests.py index 7b32697..2030c4a 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -110,7 +110,7 @@ def determine_modules_to_test(changed_modules): ['graphx', 'examples'] >>> x = [x.name for x in determine_modules_to_test([modules.sql])] >>> x # doctest: +NORMALIZE_WHITESPACE - ['sql', 'hive', 'mllib', 'examples', 'hive-thriftserver', 'hivecontext-compatibility', + ['sql', 'hive', 'mllib', 'examples', 'hive-thriftserver', 'pyspark-sql', 'sparkr', 'pyspark-mllib', 'pyspark-ml'] """ modules_to_test = set() http://git-wip-us.apache.org/repos/asf/spark/blob/c0cc921a/dev/sparktestsupport/modules.py ---------------------------------------------------------------------- diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 0d6aa74..8e2364d 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -158,18 +158,6 @@ hive_thriftserver = Module( ) -hivecontext_compatibility = Module( - name="hivecontext-compatibility", - dependencies=[hive], - source_file_regexes=[ - "sql/hivecontext-compatibility/", - ], - sbt_test_goals=[ - "hivecontext-compatibility/test" - ] -) - - sketch = Module( name="sketch", dependencies=[tags], http://git-wip-us.apache.org/repos/asf/spark/blob/c0cc921a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9c13af1..e778f77 100644 --- a/pom.xml +++ b/pom.xml @@ -100,7 +100,6 @@ <module>sql/catalyst</module> <module>sql/core</module> <module>sql/hive</module> - <module>sql/hivecontext-compatibility</module> <module>assembly</module> <module>external/flume</module> <module>external/flume-sink</module> http://git-wip-us.apache.org/repos/asf/spark/blob/c0cc921a/project/SparkBuild.scala ---------------------------------------------------------------------- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 3ad9873..f08ca70 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -39,8 +39,8 @@ object BuildCommons { private val buildLocation = file(".").getAbsoluteFile.getParentFile - val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer, hiveCompatibility) = Seq( - "catalyst", "sql", "hive", "hive-thriftserver", "hivecontext-compatibility" + val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer) = Seq( + "catalyst", "sql", "hive", "hive-thriftserver" ).map(ProjectRef(buildLocation, _)) val streamingProjects@Seq( @@ -339,7 +339,7 @@ object SparkBuild extends PomBuild { val mimaProjects = allProjects.filterNot { x => Seq( - spark, hive, hiveThriftServer, hiveCompatibility, catalyst, repl, networkCommon, networkShuffle, networkYarn, + spark, hive, hiveThriftServer, catalyst, repl, networkCommon, networkShuffle, networkYarn, unsafe, tags, sketch, mllibLocal ).contains(x) } http://git-wip-us.apache.org/repos/asf/spark/blob/c0cc921a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala new file mode 100644 index 0000000..415d4c0 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.SparkContext +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{SparkSession, SQLContext} + + +/** + * An instance of the Spark SQL execution engine that integrates with data stored in Hive. + * Configuration for Hive is read from hive-site.xml on the classpath. + */ +@deprecated("Use SparkSession.builder.enableHiveSupport instead", "2.0.0") +class HiveContext private[hive]( + _sparkSession: SparkSession, + isRootContext: Boolean) + extends SQLContext(_sparkSession, isRootContext) with Logging { + + self => + + def this(sc: SparkContext) = { + this(new SparkSession(HiveUtils.withHiveExternalCatalog(sc)), true) + } + + def this(sc: JavaSparkContext) = this(sc.sc) + + /** + * Returns a new HiveContext as new session, which will have separated SQLConf, UDF/UDAF, + * temporary tables and SessionState, but sharing the same CacheManager, IsolatedClientLoader + * and Hive client (both of execution and metadata) with existing HiveContext. + */ + override def newSession(): HiveContext = { + new HiveContext(sparkSession.newSession(), isRootContext = false) + } + + protected[sql] override def sessionState: HiveSessionState = { + sparkSession.sessionState.asInstanceOf[HiveSessionState] + } + + protected[sql] override def sharedState: HiveSharedState = { + sparkSession.sharedState.asInstanceOf[HiveSharedState] + } + + /** + * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, + * Spark SQL or the external data source library it uses might cache certain metadata about a + * table, such as the location of blocks. When those change outside of Spark SQL, users should + * call this function to invalidate the cache. + * + * @since 1.3.0 + */ + def refreshTable(tableName: String): Unit = { + sparkSession.catalog.refreshTable(tableName) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/c0cc921a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala new file mode 100644 index 0000000..3aa8174 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala @@ -0,0 +1,101 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.hive + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} + + +class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEach { + + private var sc: SparkContext = null + private var hc: HiveContext = null + + override def beforeAll(): Unit = { + super.beforeAll() + sc = SparkContext.getOrCreate(new SparkConf().setMaster("local").setAppName("test")) + HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true).foreach { case (k, v) => + sc.hadoopConfiguration.set(k, v) + } + hc = new HiveContext(sc) + } + + override def afterEach(): Unit = { + try { + hc.sharedState.cacheManager.clearCache() + hc.sessionState.catalog.reset() + } finally { + super.afterEach() + } + } + + override def afterAll(): Unit = { + try { + sc = null + hc = null + } finally { + super.afterAll() + } + } + + test("basic operations") { + val _hc = hc + import _hc.implicits._ + val df1 = (1 to 20).map { i => (i, i) }.toDF("a", "x") + val df2 = (1 to 100).map { i => (i, i % 10, i % 2 == 0) }.toDF("a", "b", "c") + .select($"a", $"b") + .filter($"a" > 10 && $"b" > 6 && $"c") + val df3 = df1.join(df2, "a") + val res = df3.collect() + val expected = Seq((18, 18, 8)).toDF("a", "x", "b").collect() + assert(res.toSeq == expected.toSeq) + df3.createOrReplaceTempView("mai_table") + val df4 = hc.table("mai_table") + val res2 = df4.collect() + assert(res2.toSeq == expected.toSeq) + } + + test("basic DDLs") { + val _hc = hc + import _hc.implicits._ + val databases = hc.sql("SHOW DATABASES").collect().map(_.getString(0)) + assert(databases.toSeq == Seq("default")) + hc.sql("CREATE DATABASE mee_db") + hc.sql("USE mee_db") + val databases2 = hc.sql("SHOW DATABASES").collect().map(_.getString(0)) + assert(databases2.toSet == Set("default", "mee_db")) + val df = (1 to 10).map { i => ("bob" + i.toString, i) }.toDF("name", "age") + df.createOrReplaceTempView("mee_table") + hc.sql("CREATE TABLE moo_table (name string, age int)") + hc.sql("INSERT INTO moo_table SELECT * FROM mee_table") + assert( + hc.sql("SELECT * FROM moo_table order by name").collect().toSeq == + df.collect().toSeq.sortBy(_.getString(0))) + val tables = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0)) + assert(tables.toSet == Set("moo_table", "mee_table")) + hc.sql("DROP TABLE moo_table") + hc.sql("DROP TABLE mee_table") + val tables2 = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0)) + assert(tables2.isEmpty) + hc.sql("DROP DATABASE mee_db CASCADE") + val databases3 = hc.sql("SHOW DATABASES").collect().map(_.getString(0)) + assert(databases3.toSeq == Seq("default")) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/c0cc921a/sql/hivecontext-compatibility/pom.xml ---------------------------------------------------------------------- diff --git a/sql/hivecontext-compatibility/pom.xml b/sql/hivecontext-compatibility/pom.xml deleted file mode 100644 index ed9ef8e..0000000 --- a/sql/hivecontext-compatibility/pom.xml +++ /dev/null @@ -1,57 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.spark</groupId> - <artifactId>spark-parent_2.11</artifactId> - <version>2.0.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <groupId>org.apache.spark</groupId> - <artifactId>spark-hivecontext-compatibility_2.11</artifactId> - <packaging>jar</packaging> - <name>Spark Project HiveContext Compatibility</name> - <url>http://spark.apache.org/</url> - <properties> - <sbt.project.name>hivecontext-compatibility</sbt.project.name> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> - <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> - </build> -</project> http://git-wip-us.apache.org/repos/asf/spark/blob/c0cc921a/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala ---------------------------------------------------------------------- diff --git a/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala deleted file mode 100644 index 415d4c0..0000000 --- a/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import org.apache.spark.SparkContext -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.internal.Logging -import org.apache.spark.sql.{SparkSession, SQLContext} - - -/** - * An instance of the Spark SQL execution engine that integrates with data stored in Hive. - * Configuration for Hive is read from hive-site.xml on the classpath. - */ -@deprecated("Use SparkSession.builder.enableHiveSupport instead", "2.0.0") -class HiveContext private[hive]( - _sparkSession: SparkSession, - isRootContext: Boolean) - extends SQLContext(_sparkSession, isRootContext) with Logging { - - self => - - def this(sc: SparkContext) = { - this(new SparkSession(HiveUtils.withHiveExternalCatalog(sc)), true) - } - - def this(sc: JavaSparkContext) = this(sc.sc) - - /** - * Returns a new HiveContext as new session, which will have separated SQLConf, UDF/UDAF, - * temporary tables and SessionState, but sharing the same CacheManager, IsolatedClientLoader - * and Hive client (both of execution and metadata) with existing HiveContext. - */ - override def newSession(): HiveContext = { - new HiveContext(sparkSession.newSession(), isRootContext = false) - } - - protected[sql] override def sessionState: HiveSessionState = { - sparkSession.sessionState.asInstanceOf[HiveSessionState] - } - - protected[sql] override def sharedState: HiveSharedState = { - sparkSession.sharedState.asInstanceOf[HiveSharedState] - } - - /** - * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, - * Spark SQL or the external data source library it uses might cache certain metadata about a - * table, such as the location of blocks. When those change outside of Spark SQL, users should - * call this function to invalidate the cache. - * - * @since 1.3.0 - */ - def refreshTable(tableName: String): Unit = { - sparkSession.catalog.refreshTable(tableName) - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/c0cc921a/sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala b/sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala deleted file mode 100644 index 1c1db72..0000000 --- a/sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql.hive - -import org.scalatest.BeforeAndAfterEach - -import org.apache.spark.{SparkContext, SparkFunSuite} - - -class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEach { - - private var sc: SparkContext = null - private var hc: HiveContext = null - - override def beforeAll(): Unit = { - super.beforeAll() - sc = new SparkContext("local[4]", "test") - HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true).foreach { case (k, v) => - sc.hadoopConfiguration.set(k, v) - } - hc = new HiveContext(sc) - } - - override def afterEach(): Unit = { - try { - hc.sharedState.cacheManager.clearCache() - hc.sessionState.catalog.reset() - } finally { - super.afterEach() - } - } - - override def afterAll(): Unit = { - try { - sc.stop() - sc = null - hc = null - } finally { - super.afterAll() - } - } - - test("basic operations") { - val _hc = hc - import _hc.implicits._ - val df1 = (1 to 20).map { i => (i, i) }.toDF("a", "x") - val df2 = (1 to 100).map { i => (i, i % 10, i % 2 == 0) }.toDF("a", "b", "c") - .select($"a", $"b") - .filter($"a" > 10 && $"b" > 6 && $"c") - val df3 = df1.join(df2, "a") - val res = df3.collect() - val expected = Seq((18, 18, 8)).toDF("a", "x", "b").collect() - assert(res.toSeq == expected.toSeq) - df3.createOrReplaceTempView("mai_table") - val df4 = hc.table("mai_table") - val res2 = df4.collect() - assert(res2.toSeq == expected.toSeq) - } - - test("basic DDLs") { - val _hc = hc - import _hc.implicits._ - val databases = hc.sql("SHOW DATABASES").collect().map(_.getString(0)) - assert(databases.toSeq == Seq("default")) - hc.sql("CREATE DATABASE mee_db") - hc.sql("USE mee_db") - val databases2 = hc.sql("SHOW DATABASES").collect().map(_.getString(0)) - assert(databases2.toSet == Set("default", "mee_db")) - val df = (1 to 10).map { i => ("bob" + i.toString, i) }.toDF("name", "age") - df.createOrReplaceTempView("mee_table") - hc.sql("CREATE TABLE moo_table (name string, age int)") - hc.sql("INSERT INTO moo_table SELECT * FROM mee_table") - assert( - hc.sql("SELECT * FROM moo_table order by name").collect().toSeq == - df.collect().toSeq.sortBy(_.getString(0))) - val tables = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0)) - assert(tables.toSet == Set("moo_table", "mee_table")) - hc.sql("DROP TABLE moo_table") - hc.sql("DROP TABLE mee_table") - val tables2 = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0)) - assert(tables2.isEmpty) - hc.sql("DROP DATABASE mee_db CASCADE") - val databases3 = hc.sql("SHOW DATABASES").collect().map(_.getString(0)) - assert(databases3.toSeq == Seq("default")) - } - -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
