[SPARK-13477][SQL] Expose new user-facing Catalog interface ## What changes were proposed in this pull request?
#12625 exposed a new user-facing conf interface in `SparkSession`. This patch adds a catalog interface. ## How was this patch tested? See `CatalogSuite`. Author: Andrew Or <[email protected]> Closes #12713 from andrewor14/user-facing-catalog. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8a83a56 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8a83a56 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8a83a56 Branch: refs/heads/master Commit: d8a83a564ff3fd0281007adbf8aa3757da8a2c2b Parents: d93976d Author: Andrew Or <[email protected]> Authored: Tue Apr 26 21:29:25 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Tue Apr 26 21:29:25 2016 -0700 ---------------------------------------------------------------------- .../spark/sql/catalyst/ScalaReflection.scala | 26 +- .../sql/catalyst/catalog/SessionCatalog.scala | 36 +- .../spark/sql/catalyst/catalog/interface.scala | 8 +- .../catalyst/encoders/ExpressionEncoder.scala | 5 +- .../sql/catalyst/catalog/CatalogTestCases.scala | 580 ------------------ .../catalyst/catalog/ExternalCatalogSuite.scala | 581 +++++++++++++++++++ .../catalyst/catalog/InMemoryCatalogSuite.scala | 2 +- .../catalyst/catalog/SessionCatalogSuite.scala | 2 +- .../scala/org/apache/spark/sql/SQLContext.scala | 33 +- .../org/apache/spark/sql/SparkSession.scala | 248 +------- .../org/apache/spark/sql/catalog/Catalog.scala | 214 +++++++ .../apache/spark/sql/catalog/interface.scala | 101 ++++ .../spark/sql/execution/SparkSqlParser.scala | 6 +- .../spark/sql/execution/command/cache.scala | 4 +- .../spark/sql/execution/command/commands.scala | 4 +- .../command/createDataSourceTables.scala | 4 +- .../spark/sql/execution/command/ddl.scala | 8 +- .../spark/sql/execution/command/tables.scala | 2 +- .../spark/sql/execution/command/views.scala | 2 +- .../apache/spark/sql/internal/CatalogImpl.scala | 352 +++++++++++ .../spark/sql/execution/command/DDLSuite.scala | 2 +- .../spark/sql/internal/CatalogSuite.scala | 271 +++++++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../spark/sql/hive/MetastoreRelation.scala | 8 +- .../spark/sql/hive/client/HiveClientImpl.scala | 16 +- .../spark/sql/hive/CachedTableSuite.scala | 4 +- .../spark/sql/hive/HiveDDLCommandSuite.scala | 18 +- .../sql/hive/HiveExternalCatalogSuite.scala | 2 +- .../sql/hive/HiveMetastoreCatalogSuite.scala | 6 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 10 +- .../spark/sql/hive/client/VersionsSuite.scala | 2 +- .../spark/sql/hive/execution/HiveDDLSuite.scala | 6 +- 32 files changed, 1665 insertions(+), 900 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index bd72313..be67605 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -22,7 +22,15 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String -import org.apache.spark.util.Utils + + +/** + * A helper trait to create [[org.apache.spark.sql.catalyst.encoders.ExpressionEncoder]]s + * for classes whose fields are entirely defined by constructor params but should not be + * case classes. + */ +private[sql] trait DefinedByConstructorParams + /** * A default version of ScalaReflection that uses the runtime universe. @@ -333,7 +341,7 @@ object ScalaReflection extends ScalaReflection { "toScalaMap", keyData :: valueData :: Nil) - case t if t <:< localTypeOf[Product] => + case t if definedByConstructorParams(t) => val params = getConstructorParameters(t) val cls = getClassFromType(tpe) @@ -401,7 +409,7 @@ object ScalaReflection extends ScalaReflection { val clsName = getClassNameFromType(tpe) val walkedTypePath = s"""- root class: "${clsName}"""" :: Nil serializerFor(inputObject, tpe, walkedTypePath) match { - case expressions.If(_, _, s: CreateNamedStruct) if tpe <:< localTypeOf[Product] => s + case expressions.If(_, _, s: CreateNamedStruct) if definedByConstructorParams(tpe) => s case other => CreateNamedStruct(expressions.Literal("value") :: other :: Nil) } } @@ -491,7 +499,7 @@ object ScalaReflection extends ScalaReflection { serializerFor(unwrapped, optType, newPath)) } - case t if t <:< localTypeOf[Product] => + case t if definedByConstructorParams(t) => val params = getConstructorParameters(t) val nonNullOutput = CreateNamedStruct(params.flatMap { case (fieldName, fieldType) => val fieldValue = Invoke(inputObject, fieldName, dataTypeFor(fieldType)) @@ -680,7 +688,7 @@ object ScalaReflection extends ScalaReflection { val Schema(valueDataType, valueNullable) = schemaFor(valueType) Schema(MapType(schemaFor(keyType).dataType, valueDataType, valueContainsNull = valueNullable), nullable = true) - case t if t <:< localTypeOf[Product] => + case t if definedByConstructorParams(t) => val params = getConstructorParameters(t) Schema(StructType( params.map { case (fieldName, fieldType) => @@ -712,6 +720,14 @@ object ScalaReflection extends ScalaReflection { throw new UnsupportedOperationException(s"Schema for type $other is not supported") } } + + /** + * Whether the fields of the given type is defined entirely by its constructor parameters. + */ + private[sql] def definedByConstructorParams(tpe: Type): Boolean = { + tpe <:< localTypeOf[Product] || tpe <:< localTypeOf[DefinedByConstructorParams] + } + } /** http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 402aacf..91d35de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -613,6 +613,25 @@ class SessionCatalog( } /** + * Look up the [[ExpressionInfo]] associated with the specified function, assuming it exists. + */ + private[spark] def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = { + // TODO: just make function registry take in FunctionIdentifier instead of duplicating this + val qualifiedName = name.copy(database = name.database.orElse(Some(currentDb))) + functionRegistry.lookupFunction(name.funcName) + .orElse(functionRegistry.lookupFunction(qualifiedName.unquotedString)) + .getOrElse { + val db = qualifiedName.database.get + if (externalCatalog.functionExists(db, name.funcName)) { + val metadata = externalCatalog.getFunction(db, name.funcName) + new ExpressionInfo(metadata.className, qualifiedName.unquotedString) + } else { + failFunctionLookup(name.funcName) + } + } + } + + /** * Return an [[Expression]] that represents the specified function, assuming it exists. * * For a temporary function or a permanent function that has been loaded, @@ -646,6 +665,7 @@ class SessionCatalog( // The function has not been loaded to the function registry, which means // that the function is a permanent function (if it actually has been registered // in the metastore). We need to first put the function in the FunctionRegistry. + // TODO: why not just check whether the function exists first? val catalogFunction = try { externalCatalog.getFunction(currentDb, name.funcName) } catch { @@ -662,7 +682,7 @@ class SessionCatalog( val builder = makeFunctionBuilder(qualifiedName.unquotedString, catalogFunction.className) createTempFunction(qualifiedName.unquotedString, info, builder, ignoreIfExists = false) // Now, we need to create the Expression. - return functionRegistry.lookupFunction(qualifiedName.unquotedString, children) + functionRegistry.lookupFunction(qualifiedName.unquotedString, children) } /** @@ -687,8 +707,8 @@ class SessionCatalog( // ----------------- /** - * Drop all existing databases (except "default") along with all associated tables, - * partitions and functions, and set the current database to "default". + * Drop all existing databases (except "default"), tables, partitions and functions, + * and set the current database to "default". * * This is mainly used for tests. */ @@ -697,6 +717,16 @@ class SessionCatalog( listDatabases().filter(_ != default).foreach { db => dropDatabase(db, ignoreIfNotExists = false, cascade = true) } + listTables(default).foreach { table => + dropTable(table, ignoreIfNotExists = false) + } + listFunctions(default).foreach { func => + if (func.database.isDefined) { + dropFunction(func, ignoreIfNotExists = false) + } else { + dropTempFunction(func.funcName, ignoreIfNotExists = false) + } + } tempTables.clear() functionRegistry.clear() // restore built-in functions http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 9e90987..d1e2b3f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -299,10 +299,10 @@ case class CatalogTable( case class CatalogTableType private(name: String) object CatalogTableType { - val EXTERNAL_TABLE = new CatalogTableType("EXTERNAL_TABLE") - val MANAGED_TABLE = new CatalogTableType("MANAGED_TABLE") - val INDEX_TABLE = new CatalogTableType("INDEX_TABLE") - val VIRTUAL_VIEW = new CatalogTableType("VIRTUAL_VIEW") + val EXTERNAL = new CatalogTableType("EXTERNAL") + val MANAGED = new CatalogTableType("MANAGED") + val INDEX = new CatalogTableType("INDEX") + val VIEW = new CatalogTableType("VIEW") } http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 56d29cf..5d29448 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -47,8 +47,9 @@ object ExpressionEncoder { def apply[T : TypeTag](): ExpressionEncoder[T] = { // We convert the not-serializable TypeTag into StructType and ClassTag. val mirror = typeTag[T].mirror - val cls = mirror.runtimeClass(typeTag[T].tpe) - val flat = !classOf[Product].isAssignableFrom(cls) + val tpe = typeTag[T].tpe + val cls = mirror.runtimeClass(tpe) + val flat = !ScalaReflection.definedByConstructorParams(tpe) val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[T], nullable = false) val serializer = ScalaReflection.serializerFor[T](inputObject) http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala deleted file mode 100644 index f961fe3..0000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala +++ /dev/null @@ -1,580 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.catalog - -import org.scalatest.BeforeAndAfterEach - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.util.Utils - - -/** - * A reasonable complete test suite (i.e. behaviors) for a [[ExternalCatalog]]. - * - * Implementations of the [[ExternalCatalog]] interface can create test suites by extending this. - */ -abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { - protected val utils: CatalogTestUtils - import utils._ - - protected def resetState(): Unit = { } - - // Clear all state after each test - override def afterEach(): Unit = { - try { - resetState() - } finally { - super.afterEach() - } - } - - // -------------------------------------------------------------------------- - // Databases - // -------------------------------------------------------------------------- - - test("basic create and list databases") { - val catalog = newEmptyCatalog() - catalog.createDatabase(newDb("default"), ignoreIfExists = true) - assert(catalog.databaseExists("default")) - assert(!catalog.databaseExists("testing")) - assert(!catalog.databaseExists("testing2")) - catalog.createDatabase(newDb("testing"), ignoreIfExists = false) - assert(catalog.databaseExists("testing")) - assert(catalog.listDatabases().toSet == Set("default", "testing")) - catalog.createDatabase(newDb("testing2"), ignoreIfExists = false) - assert(catalog.listDatabases().toSet == Set("default", "testing", "testing2")) - assert(catalog.databaseExists("testing2")) - assert(!catalog.databaseExists("does_not_exist")) - } - - test("get database when a database exists") { - val db1 = newBasicCatalog().getDatabase("db1") - assert(db1.name == "db1") - assert(db1.description.contains("db1")) - } - - test("get database should throw exception when the database does not exist") { - intercept[AnalysisException] { newBasicCatalog().getDatabase("db_that_does_not_exist") } - } - - test("list databases without pattern") { - val catalog = newBasicCatalog() - assert(catalog.listDatabases().toSet == Set("default", "db1", "db2")) - } - - test("list databases with pattern") { - val catalog = newBasicCatalog() - assert(catalog.listDatabases("db").toSet == Set.empty) - assert(catalog.listDatabases("db*").toSet == Set("db1", "db2")) - assert(catalog.listDatabases("*1").toSet == Set("db1")) - assert(catalog.listDatabases("db2").toSet == Set("db2")) - } - - test("drop database") { - val catalog = newBasicCatalog() - catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false) - assert(catalog.listDatabases().toSet == Set("default", "db2")) - } - - test("drop database when the database is not empty") { - // Throw exception if there are functions left - val catalog1 = newBasicCatalog() - catalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false) - catalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false) - intercept[AnalysisException] { - catalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false) - } - resetState() - - // Throw exception if there are tables left - val catalog2 = newBasicCatalog() - catalog2.dropFunction("db2", "func1") - intercept[AnalysisException] { - catalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false) - } - resetState() - - // When cascade is true, it should drop them - val catalog3 = newBasicCatalog() - catalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true) - assert(catalog3.listDatabases().toSet == Set("default", "db1")) - } - - test("drop database when the database does not exist") { - val catalog = newBasicCatalog() - - intercept[AnalysisException] { - catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false) - } - - catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false) - } - - test("alter database") { - val catalog = newBasicCatalog() - val db1 = catalog.getDatabase("db1") - // Note: alter properties here because Hive does not support altering other fields - catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true"))) - val newDb1 = catalog.getDatabase("db1") - assert(db1.properties.isEmpty) - assert(newDb1.properties.size == 2) - assert(newDb1.properties.get("k") == Some("v3")) - assert(newDb1.properties.get("good") == Some("true")) - } - - test("alter database should throw exception when the database does not exist") { - intercept[AnalysisException] { - newBasicCatalog().alterDatabase(newDb("does_not_exist")) - } - } - - // -------------------------------------------------------------------------- - // Tables - // -------------------------------------------------------------------------- - - test("the table type of an external table should be EXTERNAL_TABLE") { - val catalog = newBasicCatalog() - val table = - newTable("external_table1", "db2").copy(tableType = CatalogTableType.EXTERNAL_TABLE) - catalog.createTable("db2", table, ignoreIfExists = false) - val actual = catalog.getTable("db2", "external_table1") - assert(actual.tableType === CatalogTableType.EXTERNAL_TABLE) - } - - test("drop table") { - val catalog = newBasicCatalog() - assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) - catalog.dropTable("db2", "tbl1", ignoreIfNotExists = false) - assert(catalog.listTables("db2").toSet == Set("tbl2")) - } - - test("drop table when database/table does not exist") { - val catalog = newBasicCatalog() - // Should always throw exception when the database does not exist - intercept[AnalysisException] { - catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false) - } - intercept[AnalysisException] { - catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true) - } - // Should throw exception when the table does not exist, if ignoreIfNotExists is false - intercept[AnalysisException] { - catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false) - } - catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true) - } - - test("rename table") { - val catalog = newBasicCatalog() - assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) - catalog.renameTable("db2", "tbl1", "tblone") - assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2")) - } - - test("rename table when database/table does not exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.renameTable("unknown_db", "unknown_table", "unknown_table") - } - intercept[AnalysisException] { - catalog.renameTable("db2", "unknown_table", "unknown_table") - } - } - - test("alter table") { - val catalog = newBasicCatalog() - val tbl1 = catalog.getTable("db2", "tbl1") - catalog.alterTable("db2", tbl1.copy(properties = Map("toh" -> "frem"))) - val newTbl1 = catalog.getTable("db2", "tbl1") - assert(!tbl1.properties.contains("toh")) - assert(newTbl1.properties.size == tbl1.properties.size + 1) - assert(newTbl1.properties.get("toh") == Some("frem")) - } - - test("alter table when database/table does not exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.alterTable("unknown_db", newTable("tbl1", "unknown_db")) - } - intercept[AnalysisException] { - catalog.alterTable("db2", newTable("unknown_table", "db2")) - } - } - - test("get table") { - assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1") - } - - test("get table when database/table does not exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.getTable("unknown_db", "unknown_table") - } - intercept[AnalysisException] { - catalog.getTable("db2", "unknown_table") - } - } - - test("list tables without pattern") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { catalog.listTables("unknown_db") } - assert(catalog.listTables("db1").toSet == Set.empty) - assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) - } - - test("list tables with pattern") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { catalog.listTables("unknown_db", "*") } - assert(catalog.listTables("db1", "*").toSet == Set.empty) - assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2")) - assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2")) - assert(catalog.listTables("db2", "*1").toSet == Set("tbl1")) - } - - // -------------------------------------------------------------------------- - // Partitions - // -------------------------------------------------------------------------- - - test("basic create and list partitions") { - val catalog = newEmptyCatalog() - catalog.createDatabase(newDb("mydb"), ignoreIfExists = false) - catalog.createTable("mydb", newTable("tbl", "mydb"), ignoreIfExists = false) - catalog.createPartitions("mydb", "tbl", Seq(part1, part2), ignoreIfExists = false) - assert(catalogPartitionsEqual(catalog, "mydb", "tbl", Seq(part1, part2))) - } - - test("create partitions when database/table does not exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.createPartitions("does_not_exist", "tbl1", Seq(), ignoreIfExists = false) - } - intercept[AnalysisException] { - catalog.createPartitions("db2", "does_not_exist", Seq(), ignoreIfExists = false) - } - } - - test("create partitions that already exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = false) - } - catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true) - } - - test("drop partitions") { - val catalog = newBasicCatalog() - assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2))) - catalog.dropPartitions( - "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false) - assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2))) - resetState() - val catalog2 = newBasicCatalog() - assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2))) - catalog2.dropPartitions( - "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false) - assert(catalog2.listPartitions("db2", "tbl2").isEmpty) - } - - test("drop partitions when database/table does not exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.dropPartitions( - "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false) - } - intercept[AnalysisException] { - catalog.dropPartitions( - "db2", "does_not_exist", Seq(), ignoreIfNotExists = false) - } - } - - test("drop partitions that do not exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.dropPartitions( - "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false) - } - catalog.dropPartitions( - "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true) - } - - test("get partition") { - val catalog = newBasicCatalog() - assert(catalog.getPartition("db2", "tbl2", part1.spec).spec == part1.spec) - assert(catalog.getPartition("db2", "tbl2", part2.spec).spec == part2.spec) - intercept[AnalysisException] { - catalog.getPartition("db2", "tbl1", part3.spec) - } - } - - test("get partition when database/table does not exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.getPartition("does_not_exist", "tbl1", part1.spec) - } - intercept[AnalysisException] { - catalog.getPartition("db2", "does_not_exist", part1.spec) - } - } - - test("rename partitions") { - val catalog = newBasicCatalog() - val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101")) - val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201")) - val newSpecs = Seq(newPart1.spec, newPart2.spec) - catalog.renamePartitions("db2", "tbl2", Seq(part1.spec, part2.spec), newSpecs) - assert(catalog.getPartition("db2", "tbl2", newPart1.spec).spec === newPart1.spec) - assert(catalog.getPartition("db2", "tbl2", newPart2.spec).spec === newPart2.spec) - // The old partitions should no longer exist - intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part1.spec) } - intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part2.spec) } - } - - test("rename partitions when database/table does not exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.renamePartitions("does_not_exist", "tbl1", Seq(part1.spec), Seq(part2.spec)) - } - intercept[AnalysisException] { - catalog.renamePartitions("db2", "does_not_exist", Seq(part1.spec), Seq(part2.spec)) - } - } - - test("alter partitions") { - val catalog = newBasicCatalog() - try { - // Note: Before altering table partitions in Hive, you *must* set the current database - // to the one that contains the table of interest. Otherwise you will end up with the - // most helpful error message ever: "Unable to alter partition. alter is not possible." - // See HIVE-2742 for more detail. - catalog.setCurrentDatabase("db2") - val newLocation = newUriForDatabase() - // alter but keep spec the same - val oldPart1 = catalog.getPartition("db2", "tbl2", part1.spec) - val oldPart2 = catalog.getPartition("db2", "tbl2", part2.spec) - catalog.alterPartitions("db2", "tbl2", Seq( - oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))), - oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation))))) - val newPart1 = catalog.getPartition("db2", "tbl2", part1.spec) - val newPart2 = catalog.getPartition("db2", "tbl2", part2.spec) - assert(newPart1.storage.locationUri == Some(newLocation)) - assert(newPart2.storage.locationUri == Some(newLocation)) - assert(oldPart1.storage.locationUri != Some(newLocation)) - assert(oldPart2.storage.locationUri != Some(newLocation)) - // alter but change spec, should fail because new partition specs do not exist yet - val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2")) - val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4")) - intercept[AnalysisException] { - catalog.alterPartitions("db2", "tbl2", Seq(badPart1, badPart2)) - } - } finally { - // Remember to restore the original current database, which we assume to be "default" - catalog.setCurrentDatabase("default") - } - } - - test("alter partitions when database/table does not exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.alterPartitions("does_not_exist", "tbl1", Seq(part1)) - } - intercept[AnalysisException] { - catalog.alterPartitions("db2", "does_not_exist", Seq(part1)) - } - } - - // -------------------------------------------------------------------------- - // Functions - // -------------------------------------------------------------------------- - - test("basic create and list functions") { - val catalog = newEmptyCatalog() - catalog.createDatabase(newDb("mydb"), ignoreIfExists = false) - catalog.createFunction("mydb", newFunc("myfunc")) - assert(catalog.listFunctions("mydb", "*").toSet == Set("myfunc")) - } - - test("create function when database does not exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.createFunction("does_not_exist", newFunc()) - } - } - - test("create function that already exists") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.createFunction("db2", newFunc("func1")) - } - } - - test("drop function") { - val catalog = newBasicCatalog() - assert(catalog.listFunctions("db2", "*").toSet == Set("func1")) - catalog.dropFunction("db2", "func1") - assert(catalog.listFunctions("db2", "*").isEmpty) - } - - test("drop function when database does not exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.dropFunction("does_not_exist", "something") - } - } - - test("drop function that does not exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.dropFunction("db2", "does_not_exist") - } - } - - test("get function") { - val catalog = newBasicCatalog() - assert(catalog.getFunction("db2", "func1") == - CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass, - Seq.empty[(String, String)])) - intercept[AnalysisException] { - catalog.getFunction("db2", "does_not_exist") - } - } - - test("get function when database does not exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.getFunction("does_not_exist", "func1") - } - } - - test("rename function") { - val catalog = newBasicCatalog() - val newName = "funcky" - assert(catalog.getFunction("db2", "func1").className == funcClass) - catalog.renameFunction("db2", "func1", newName) - intercept[AnalysisException] { catalog.getFunction("db2", "func1") } - assert(catalog.getFunction("db2", newName).identifier.funcName == newName) - assert(catalog.getFunction("db2", newName).className == funcClass) - intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") } - } - - test("rename function when database does not exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.renameFunction("does_not_exist", "func1", "func5") - } - } - - test("list functions") { - val catalog = newBasicCatalog() - catalog.createFunction("db2", newFunc("func2")) - catalog.createFunction("db2", newFunc("not_me")) - assert(catalog.listFunctions("db2", "*").toSet == Set("func1", "func2", "not_me")) - assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2")) - } - -} - - -/** - * A collection of utility fields and methods for tests related to the [[ExternalCatalog]]. - */ -abstract class CatalogTestUtils { - - // Unimplemented methods - val tableInputFormat: String - val tableOutputFormat: String - def newEmptyCatalog(): ExternalCatalog - - // These fields must be lazy because they rely on fields that are not implemented yet - lazy val storageFormat = CatalogStorageFormat( - locationUri = None, - inputFormat = Some(tableInputFormat), - outputFormat = Some(tableOutputFormat), - serde = None, - serdeProperties = Map.empty) - lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat) - lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat) - lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat) - lazy val funcClass = "org.apache.spark.myFunc" - - /** - * Creates a basic catalog, with the following structure: - * - * default - * db1 - * db2 - * - tbl1 - * - tbl2 - * - part1 - * - part2 - * - func1 - */ - def newBasicCatalog(): ExternalCatalog = { - val catalog = newEmptyCatalog() - // When testing against a real catalog, the default database may already exist - catalog.createDatabase(newDb("default"), ignoreIfExists = true) - catalog.createDatabase(newDb("db1"), ignoreIfExists = false) - catalog.createDatabase(newDb("db2"), ignoreIfExists = false) - catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false) - catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false) - catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false) - catalog.createFunction("db2", newFunc("func1", Some("db2"))) - catalog - } - - def newFunc(): CatalogFunction = newFunc("funcName") - - def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath - - def newDb(name: String): CatalogDatabase = { - CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty) - } - - def newTable(name: String, db: String): CatalogTable = newTable(name, Some(db)) - - def newTable(name: String, database: Option[String] = None): CatalogTable = { - CatalogTable( - identifier = TableIdentifier(name, database), - tableType = CatalogTableType.EXTERNAL_TABLE, - storage = storageFormat, - schema = Seq( - CatalogColumn("col1", "int"), - CatalogColumn("col2", "string"), - CatalogColumn("a", "int"), - CatalogColumn("b", "string")), - partitionColumnNames = Seq("a", "b")) - } - - def newFunc(name: String, database: Option[String] = None): CatalogFunction = { - CatalogFunction(FunctionIdentifier(name, database), funcClass, Seq.empty[(String, String)]) - } - - /** - * Whether the catalog's table partitions equal the ones given. - * Note: Hive sets some random serde things, so we just compare the specs here. - */ - def catalogPartitionsEqual( - catalog: ExternalCatalog, - db: String, - table: String, - parts: Seq[CatalogTablePartition]): Boolean = { - catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala new file mode 100644 index 0000000..d739b17 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -0,0 +1,581 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.catalog + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.util.Utils + + +/** + * A reasonable complete test suite (i.e. behaviors) for a [[ExternalCatalog]]. + * + * Implementations of the [[ExternalCatalog]] interface can create test suites by extending this. + */ +abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEach { + protected val utils: CatalogTestUtils + import utils._ + + protected def resetState(): Unit = { } + + // Clear all state after each test + override def afterEach(): Unit = { + try { + resetState() + } finally { + super.afterEach() + } + } + + // -------------------------------------------------------------------------- + // Databases + // -------------------------------------------------------------------------- + + test("basic create and list databases") { + val catalog = newEmptyCatalog() + catalog.createDatabase(newDb("default"), ignoreIfExists = true) + assert(catalog.databaseExists("default")) + assert(!catalog.databaseExists("testing")) + assert(!catalog.databaseExists("testing2")) + catalog.createDatabase(newDb("testing"), ignoreIfExists = false) + assert(catalog.databaseExists("testing")) + assert(catalog.listDatabases().toSet == Set("default", "testing")) + catalog.createDatabase(newDb("testing2"), ignoreIfExists = false) + assert(catalog.listDatabases().toSet == Set("default", "testing", "testing2")) + assert(catalog.databaseExists("testing2")) + assert(!catalog.databaseExists("does_not_exist")) + } + + test("get database when a database exists") { + val db1 = newBasicCatalog().getDatabase("db1") + assert(db1.name == "db1") + assert(db1.description.contains("db1")) + } + + test("get database should throw exception when the database does not exist") { + intercept[AnalysisException] { newBasicCatalog().getDatabase("db_that_does_not_exist") } + } + + test("list databases without pattern") { + val catalog = newBasicCatalog() + assert(catalog.listDatabases().toSet == Set("default", "db1", "db2")) + } + + test("list databases with pattern") { + val catalog = newBasicCatalog() + assert(catalog.listDatabases("db").toSet == Set.empty) + assert(catalog.listDatabases("db*").toSet == Set("db1", "db2")) + assert(catalog.listDatabases("*1").toSet == Set("db1")) + assert(catalog.listDatabases("db2").toSet == Set("db2")) + } + + test("drop database") { + val catalog = newBasicCatalog() + catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false) + assert(catalog.listDatabases().toSet == Set("default", "db2")) + } + + test("drop database when the database is not empty") { + // Throw exception if there are functions left + val catalog1 = newBasicCatalog() + catalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false) + catalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false) + intercept[AnalysisException] { + catalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false) + } + resetState() + + // Throw exception if there are tables left + val catalog2 = newBasicCatalog() + catalog2.dropFunction("db2", "func1") + intercept[AnalysisException] { + catalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false) + } + resetState() + + // When cascade is true, it should drop them + val catalog3 = newBasicCatalog() + catalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true) + assert(catalog3.listDatabases().toSet == Set("default", "db1")) + } + + test("drop database when the database does not exist") { + val catalog = newBasicCatalog() + + intercept[AnalysisException] { + catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false) + } + + catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false) + } + + test("alter database") { + val catalog = newBasicCatalog() + val db1 = catalog.getDatabase("db1") + // Note: alter properties here because Hive does not support altering other fields + catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true"))) + val newDb1 = catalog.getDatabase("db1") + assert(db1.properties.isEmpty) + assert(newDb1.properties.size == 2) + assert(newDb1.properties.get("k") == Some("v3")) + assert(newDb1.properties.get("good") == Some("true")) + } + + test("alter database should throw exception when the database does not exist") { + intercept[AnalysisException] { + newBasicCatalog().alterDatabase(newDb("does_not_exist")) + } + } + + // -------------------------------------------------------------------------- + // Tables + // -------------------------------------------------------------------------- + + test("the table type of an external table should be EXTERNAL_TABLE") { + val catalog = newBasicCatalog() + val table = + newTable("external_table1", "db2").copy(tableType = CatalogTableType.EXTERNAL) + catalog.createTable("db2", table, ignoreIfExists = false) + val actual = catalog.getTable("db2", "external_table1") + assert(actual.tableType === CatalogTableType.EXTERNAL) + } + + test("drop table") { + val catalog = newBasicCatalog() + assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + catalog.dropTable("db2", "tbl1", ignoreIfNotExists = false) + assert(catalog.listTables("db2").toSet == Set("tbl2")) + } + + test("drop table when database/table does not exist") { + val catalog = newBasicCatalog() + // Should always throw exception when the database does not exist + intercept[AnalysisException] { + catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false) + } + intercept[AnalysisException] { + catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true) + } + // Should throw exception when the table does not exist, if ignoreIfNotExists is false + intercept[AnalysisException] { + catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false) + } + catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true) + } + + test("rename table") { + val catalog = newBasicCatalog() + assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + catalog.renameTable("db2", "tbl1", "tblone") + assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2")) + } + + test("rename table when database/table does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.renameTable("unknown_db", "unknown_table", "unknown_table") + } + intercept[AnalysisException] { + catalog.renameTable("db2", "unknown_table", "unknown_table") + } + } + + test("alter table") { + val catalog = newBasicCatalog() + val tbl1 = catalog.getTable("db2", "tbl1") + catalog.alterTable("db2", tbl1.copy(properties = Map("toh" -> "frem"))) + val newTbl1 = catalog.getTable("db2", "tbl1") + assert(!tbl1.properties.contains("toh")) + assert(newTbl1.properties.size == tbl1.properties.size + 1) + assert(newTbl1.properties.get("toh") == Some("frem")) + } + + test("alter table when database/table does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.alterTable("unknown_db", newTable("tbl1", "unknown_db")) + } + intercept[AnalysisException] { + catalog.alterTable("db2", newTable("unknown_table", "db2")) + } + } + + test("get table") { + assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1") + } + + test("get table when database/table does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.getTable("unknown_db", "unknown_table") + } + intercept[AnalysisException] { + catalog.getTable("db2", "unknown_table") + } + } + + test("list tables without pattern") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { catalog.listTables("unknown_db") } + assert(catalog.listTables("db1").toSet == Set.empty) + assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + } + + test("list tables with pattern") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { catalog.listTables("unknown_db", "*") } + assert(catalog.listTables("db1", "*").toSet == Set.empty) + assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2")) + assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2")) + assert(catalog.listTables("db2", "*1").toSet == Set("tbl1")) + } + + // -------------------------------------------------------------------------- + // Partitions + // -------------------------------------------------------------------------- + + test("basic create and list partitions") { + val catalog = newEmptyCatalog() + catalog.createDatabase(newDb("mydb"), ignoreIfExists = false) + catalog.createTable("mydb", newTable("tbl", "mydb"), ignoreIfExists = false) + catalog.createPartitions("mydb", "tbl", Seq(part1, part2), ignoreIfExists = false) + assert(catalogPartitionsEqual(catalog, "mydb", "tbl", Seq(part1, part2))) + } + + test("create partitions when database/table does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.createPartitions("does_not_exist", "tbl1", Seq(), ignoreIfExists = false) + } + intercept[AnalysisException] { + catalog.createPartitions("db2", "does_not_exist", Seq(), ignoreIfExists = false) + } + } + + test("create partitions that already exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = false) + } + catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true) + } + + test("drop partitions") { + val catalog = newBasicCatalog() + assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2))) + catalog.dropPartitions( + "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false) + assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2))) + resetState() + val catalog2 = newBasicCatalog() + assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2))) + catalog2.dropPartitions( + "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false) + assert(catalog2.listPartitions("db2", "tbl2").isEmpty) + } + + test("drop partitions when database/table does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.dropPartitions( + "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false) + } + intercept[AnalysisException] { + catalog.dropPartitions( + "db2", "does_not_exist", Seq(), ignoreIfNotExists = false) + } + } + + test("drop partitions that do not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.dropPartitions( + "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false) + } + catalog.dropPartitions( + "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true) + } + + test("get partition") { + val catalog = newBasicCatalog() + assert(catalog.getPartition("db2", "tbl2", part1.spec).spec == part1.spec) + assert(catalog.getPartition("db2", "tbl2", part2.spec).spec == part2.spec) + intercept[AnalysisException] { + catalog.getPartition("db2", "tbl1", part3.spec) + } + } + + test("get partition when database/table does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.getPartition("does_not_exist", "tbl1", part1.spec) + } + intercept[AnalysisException] { + catalog.getPartition("db2", "does_not_exist", part1.spec) + } + } + + test("rename partitions") { + val catalog = newBasicCatalog() + val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101")) + val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201")) + val newSpecs = Seq(newPart1.spec, newPart2.spec) + catalog.renamePartitions("db2", "tbl2", Seq(part1.spec, part2.spec), newSpecs) + assert(catalog.getPartition("db2", "tbl2", newPart1.spec).spec === newPart1.spec) + assert(catalog.getPartition("db2", "tbl2", newPart2.spec).spec === newPart2.spec) + // The old partitions should no longer exist + intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part1.spec) } + intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part2.spec) } + } + + test("rename partitions when database/table does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.renamePartitions("does_not_exist", "tbl1", Seq(part1.spec), Seq(part2.spec)) + } + intercept[AnalysisException] { + catalog.renamePartitions("db2", "does_not_exist", Seq(part1.spec), Seq(part2.spec)) + } + } + + test("alter partitions") { + val catalog = newBasicCatalog() + try { + // Note: Before altering table partitions in Hive, you *must* set the current database + // to the one that contains the table of interest. Otherwise you will end up with the + // most helpful error message ever: "Unable to alter partition. alter is not possible." + // See HIVE-2742 for more detail. + catalog.setCurrentDatabase("db2") + val newLocation = newUriForDatabase() + // alter but keep spec the same + val oldPart1 = catalog.getPartition("db2", "tbl2", part1.spec) + val oldPart2 = catalog.getPartition("db2", "tbl2", part2.spec) + catalog.alterPartitions("db2", "tbl2", Seq( + oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))), + oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation))))) + val newPart1 = catalog.getPartition("db2", "tbl2", part1.spec) + val newPart2 = catalog.getPartition("db2", "tbl2", part2.spec) + assert(newPart1.storage.locationUri == Some(newLocation)) + assert(newPart2.storage.locationUri == Some(newLocation)) + assert(oldPart1.storage.locationUri != Some(newLocation)) + assert(oldPart2.storage.locationUri != Some(newLocation)) + // alter but change spec, should fail because new partition specs do not exist yet + val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2")) + val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4")) + intercept[AnalysisException] { + catalog.alterPartitions("db2", "tbl2", Seq(badPart1, badPart2)) + } + } finally { + // Remember to restore the original current database, which we assume to be "default" + catalog.setCurrentDatabase("default") + } + } + + test("alter partitions when database/table does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.alterPartitions("does_not_exist", "tbl1", Seq(part1)) + } + intercept[AnalysisException] { + catalog.alterPartitions("db2", "does_not_exist", Seq(part1)) + } + } + + // -------------------------------------------------------------------------- + // Functions + // -------------------------------------------------------------------------- + + test("basic create and list functions") { + val catalog = newEmptyCatalog() + catalog.createDatabase(newDb("mydb"), ignoreIfExists = false) + catalog.createFunction("mydb", newFunc("myfunc")) + assert(catalog.listFunctions("mydb", "*").toSet == Set("myfunc")) + } + + test("create function when database does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.createFunction("does_not_exist", newFunc()) + } + } + + test("create function that already exists") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.createFunction("db2", newFunc("func1")) + } + } + + test("drop function") { + val catalog = newBasicCatalog() + assert(catalog.listFunctions("db2", "*").toSet == Set("func1")) + catalog.dropFunction("db2", "func1") + assert(catalog.listFunctions("db2", "*").isEmpty) + } + + test("drop function when database does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.dropFunction("does_not_exist", "something") + } + } + + test("drop function that does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.dropFunction("db2", "does_not_exist") + } + } + + test("get function") { + val catalog = newBasicCatalog() + assert(catalog.getFunction("db2", "func1") == + CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass, + Seq.empty[(String, String)])) + intercept[AnalysisException] { + catalog.getFunction("db2", "does_not_exist") + } + } + + test("get function when database does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.getFunction("does_not_exist", "func1") + } + } + + test("rename function") { + val catalog = newBasicCatalog() + val newName = "funcky" + assert(catalog.getFunction("db2", "func1").className == funcClass) + catalog.renameFunction("db2", "func1", newName) + intercept[AnalysisException] { catalog.getFunction("db2", "func1") } + assert(catalog.getFunction("db2", newName).identifier.funcName == newName) + assert(catalog.getFunction("db2", newName).className == funcClass) + intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") } + } + + test("rename function when database does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.renameFunction("does_not_exist", "func1", "func5") + } + } + + test("list functions") { + val catalog = newBasicCatalog() + catalog.createFunction("db2", newFunc("func2")) + catalog.createFunction("db2", newFunc("not_me")) + assert(catalog.listFunctions("db2", "*").toSet == Set("func1", "func2", "not_me")) + assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2")) + } + +} + + +/** + * A collection of utility fields and methods for tests related to the [[ExternalCatalog]]. + */ +abstract class CatalogTestUtils { + + // Unimplemented methods + val tableInputFormat: String + val tableOutputFormat: String + def newEmptyCatalog(): ExternalCatalog + + // These fields must be lazy because they rely on fields that are not implemented yet + lazy val storageFormat = CatalogStorageFormat( + locationUri = None, + inputFormat = Some(tableInputFormat), + outputFormat = Some(tableOutputFormat), + serde = None, + serdeProperties = Map.empty) + lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat) + lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat) + lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat) + lazy val funcClass = "org.apache.spark.myFunc" + + /** + * Creates a basic catalog, with the following structure: + * + * default + * db1 + * db2 + * - tbl1 + * - tbl2 + * - part1 + * - part2 + * - func1 + */ + def newBasicCatalog(): ExternalCatalog = { + val catalog = newEmptyCatalog() + // When testing against a real catalog, the default database may already exist + catalog.createDatabase(newDb("default"), ignoreIfExists = true) + catalog.createDatabase(newDb("db1"), ignoreIfExists = false) + catalog.createDatabase(newDb("db2"), ignoreIfExists = false) + catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false) + catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false) + catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false) + catalog.createFunction("db2", newFunc("func1", Some("db2"))) + catalog + } + + def newFunc(): CatalogFunction = newFunc("funcName") + + def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath + + def newDb(name: String): CatalogDatabase = { + CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty) + } + + def newTable(name: String, db: String): CatalogTable = newTable(name, Some(db)) + + def newTable(name: String, database: Option[String] = None): CatalogTable = { + CatalogTable( + identifier = TableIdentifier(name, database), + tableType = CatalogTableType.EXTERNAL, + storage = storageFormat, + schema = Seq( + CatalogColumn("col1", "int"), + CatalogColumn("col2", "string"), + CatalogColumn("a", "int"), + CatalogColumn("b", "string")), + partitionColumnNames = Seq("a", "b"), + bucketColumnNames = Seq("col1")) + } + + def newFunc(name: String, database: Option[String] = None): CatalogFunction = { + CatalogFunction(FunctionIdentifier(name, database), funcClass, Seq.empty[(String, String)]) + } + + /** + * Whether the catalog's table partitions equal the ones given. + * Note: Hive sets some random serde things, so we just compare the specs here. + */ + def catalogPartitionsEqual( + catalog: ExternalCatalog, + db: String, + table: String, + parts: Seq[CatalogTablePartition]): Boolean = { + catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala index 63a7b2c..0605daa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.catalog /** Test suite for the [[InMemoryCatalog]]. */ -class InMemoryCatalogSuite extends CatalogTestCases { +class InMemoryCatalogSuite extends ExternalCatalogSuite { protected override val utils: CatalogTestUtils = new CatalogTestUtils { override val tableInputFormat: String = "org.apache.park.SequenceFileInputFormat" http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 1933be5..ba5d8ce 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias} /** * Tests for [[SessionCatalog]] that assume that [[InMemoryCatalog]] is correctly implemented. * - * Note: many of the methods here are very similar to the ones in [[CatalogTestCases]]. + * Note: many of the methods here are very similar to the ones in [[ExternalCatalogSuite]]. * This is because [[SessionCatalog]] and [[ExternalCatalog]] share many similar method * signatures but do not extend a common parent. This is largely by design but * unfortunately leads to very similar test code in two places. http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 47c043a..dbbdf11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.ShowTablesCommand import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf} import org.apache.spark.sql.sources.BaseRelation @@ -258,7 +259,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def isCached(tableName: String): Boolean = { - sparkSession.isCached(tableName) + sparkSession.catalog.isCached(tableName) } /** @@ -267,7 +268,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ private[sql] def isCached(qName: Dataset[_]): Boolean = { - sparkSession.isCached(qName) + sparkSession.cacheManager.lookupCachedData(qName).nonEmpty } /** @@ -276,7 +277,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def cacheTable(tableName: String): Unit = { - sparkSession.cacheTable(tableName) + sparkSession.catalog.cacheTable(tableName) } /** @@ -285,7 +286,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def uncacheTable(tableName: String): Unit = { - sparkSession.uncacheTable(tableName) + sparkSession.catalog.uncacheTable(tableName) } /** @@ -293,7 +294,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def clearCache(): Unit = { - sparkSession.clearCache() + sparkSession.catalog.clearCache() } // scalastyle:off @@ -507,7 +508,7 @@ class SQLContext private[sql]( */ @Experimental def createExternalTable(tableName: String, path: String): DataFrame = { - sparkSession.createExternalTable(tableName, path) + sparkSession.catalog.createExternalTable(tableName, path) } /** @@ -523,7 +524,7 @@ class SQLContext private[sql]( tableName: String, path: String, source: String): DataFrame = { - sparkSession.createExternalTable(tableName, path, source) + sparkSession.catalog.createExternalTable(tableName, path, source) } /** @@ -539,7 +540,7 @@ class SQLContext private[sql]( tableName: String, source: String, options: java.util.Map[String, String]): DataFrame = { - sparkSession.createExternalTable(tableName, source, options) + sparkSession.catalog.createExternalTable(tableName, source, options) } /** @@ -556,7 +557,7 @@ class SQLContext private[sql]( tableName: String, source: String, options: Map[String, String]): DataFrame = { - sparkSession.createExternalTable(tableName, source, options) + sparkSession.catalog.createExternalTable(tableName, source, options) } /** @@ -573,7 +574,7 @@ class SQLContext private[sql]( source: String, schema: StructType, options: java.util.Map[String, String]): DataFrame = { - sparkSession.createExternalTable(tableName, source, schema, options) + sparkSession.catalog.createExternalTable(tableName, source, schema, options) } /** @@ -591,7 +592,7 @@ class SQLContext private[sql]( source: String, schema: StructType, options: Map[String, String]): DataFrame = { - sparkSession.createExternalTable(tableName, source, schema, options) + sparkSession.catalog.createExternalTable(tableName, source, schema, options) } /** @@ -611,7 +612,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def dropTempTable(tableName: String): Unit = { - sparkSession.dropTempTable(tableName) + sparkSession.catalog.dropTempTable(tableName) } /** @@ -700,7 +701,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def tables(): DataFrame = { - sparkSession.tables() + Dataset.ofRows(sparkSession, ShowTablesCommand(None, None)) } /** @@ -712,7 +713,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def tables(databaseName: String): DataFrame = { - sparkSession.tables(databaseName) + Dataset.ofRows(sparkSession, ShowTablesCommand(Some(databaseName), None)) } /** @@ -730,7 +731,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def tableNames(): Array[String] = { - sparkSession.tableNames() + sparkSession.catalog.listTables().collect().map(_.name) } /** @@ -740,7 +741,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def tableNames(databaseName: String): Array[String] = { - sparkSession.tableNames(databaseName) + sparkSession.catalog.listTables(databaseName).collect().map(_.name) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index a0f0bd3..6477f42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -31,16 +31,16 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.config.{CATALOG_IMPLEMENTATION, ConfigEntry} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalog.Catalog import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range} import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.ShowTablesCommand -import org.apache.spark.sql.execution.datasources.{CreateTableUsing, LogicalRelation} +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.ui.SQLListener -import org.apache.spark.sql.internal.{RuntimeConfigImpl, SessionState, SharedState} +import org.apache.spark.sql.internal.{CatalogImpl, RuntimeConfigImpl, SessionState, SharedState} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.{DataType, LongType, StructType} import org.apache.spark.sql.util.ExecutionListenerManager @@ -191,10 +191,6 @@ class SparkSession private( | Methods for accessing or mutating configurations | * -------------------------------------------------- */ - @transient private lazy val _conf: RuntimeConfig = { - new RuntimeConfigImpl(sessionState.conf, sessionState.hadoopConf) - } - /** * Runtime configuration interface for Spark. * @@ -205,7 +201,9 @@ class SparkSession private( * @group config * @since 2.0.0 */ - def conf: RuntimeConfig = _conf + @transient lazy val conf: RuntimeConfig = { + new RuntimeConfigImpl(sessionState.conf, sessionState.hadoopConf) + } /** * Set Spark SQL configuration properties. @@ -274,61 +272,6 @@ class SparkSession private( } - /* ------------------------------------- * - | Methods related to cache management | - * ------------------------------------- */ - - /** - * Returns true if the table is currently cached in-memory. - * - * @group cachemgmt - * @since 2.0.0 - */ - def isCached(tableName: String): Boolean = { - cacheManager.lookupCachedData(table(tableName)).nonEmpty - } - - /** - * Caches the specified table in-memory. - * - * @group cachemgmt - * @since 2.0.0 - */ - def cacheTable(tableName: String): Unit = { - cacheManager.cacheQuery(table(tableName), Some(tableName)) - } - - /** - * Removes the specified table from the in-memory cache. - * - * @group cachemgmt - * @since 2.0.0 - */ - def uncacheTable(tableName: String): Unit = { - cacheManager.uncacheQuery(table(tableName)) - } - - /** - * Removes all cached tables from the in-memory cache. - * - * @group cachemgmt - * @since 2.0.0 - */ - def clearCache(): Unit = { - cacheManager.clearCache() - } - - /** - * Returns true if the [[Dataset]] is currently cached in-memory. - * - * @group cachemgmt - * @since 2.0.0 - */ - protected[sql] def isCached(qName: Dataset[_]): Boolean = { - cacheManager.lookupCachedData(qName).nonEmpty - } - - /* --------------------------------- * | Methods for creating DataFrames | * --------------------------------- */ @@ -605,139 +548,18 @@ class SparkSession private( } - /* --------------------------- * - | Methods related to tables | - * --------------------------- */ - - /** - * :: Experimental :: - * Creates an external table from the given path and returns the corresponding DataFrame. - * It will use the default data source configured by spark.sql.sources.default. - * - * @group ddl_ops - * @since 2.0.0 - */ - @Experimental - def createExternalTable(tableName: String, path: String): DataFrame = { - val dataSourceName = sessionState.conf.defaultDataSourceName - createExternalTable(tableName, path, dataSourceName) - } - - /** - * :: Experimental :: - * Creates an external table from the given path based on a data source - * and returns the corresponding DataFrame. - * - * @group ddl_ops - * @since 2.0.0 - */ - @Experimental - def createExternalTable(tableName: String, path: String, source: String): DataFrame = { - createExternalTable(tableName, source, Map("path" -> path)) - } - - /** - * :: Experimental :: - * Creates an external table from the given path based on a data source and a set of options. - * Then, returns the corresponding DataFrame. - * - * @group ddl_ops - * @since 2.0.0 - */ - @Experimental - def createExternalTable( - tableName: String, - source: String, - options: java.util.Map[String, String]): DataFrame = { - createExternalTable(tableName, source, options.asScala.toMap) - } - - /** - * :: Experimental :: - * (Scala-specific) - * Creates an external table from the given path based on a data source and a set of options. - * Then, returns the corresponding DataFrame. - * - * @group ddl_ops - * @since 2.0.0 - */ - @Experimental - def createExternalTable( - tableName: String, - source: String, - options: Map[String, String]): DataFrame = { - val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) - val cmd = - CreateTableUsing( - tableIdent, - userSpecifiedSchema = None, - source, - temporary = false, - options, - allowExisting = false, - managedIfNoPath = false) - executePlan(cmd).toRdd - table(tableIdent) - } - - /** - * :: Experimental :: - * Create an external table from the given path based on a data source, a schema and - * a set of options. Then, returns the corresponding DataFrame. - * - * @group ddl_ops - * @since 2.0.0 - */ - @Experimental - def createExternalTable( - tableName: String, - source: String, - schema: StructType, - options: java.util.Map[String, String]): DataFrame = { - createExternalTable(tableName, source, schema, options.asScala.toMap) - } + /* ------------------------ * + | Catalog-related methods | + * ----------------- ------ */ /** - * :: Experimental :: - * (Scala-specific) - * Create an external table from the given path based on a data source, a schema and - * a set of options. Then, returns the corresponding DataFrame. + * Interface through which the user may create, drop, alter or query underlying + * databases, tables, functions etc. * * @group ddl_ops * @since 2.0.0 */ - @Experimental - def createExternalTable( - tableName: String, - source: String, - schema: StructType, - options: Map[String, String]): DataFrame = { - val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) - val cmd = - CreateTableUsing( - tableIdent, - userSpecifiedSchema = Some(schema), - source, - temporary = false, - options, - allowExisting = false, - managedIfNoPath = false) - executePlan(cmd).toRdd - table(tableIdent) - } - - /** - * Drops the temporary table with the given table name in the catalog. - * If the table has been cached/persisted before, it's also unpersisted. - * - * @param tableName the name of the table to be unregistered. - * @group ddl_ops - * @since 2.0.0 - */ - def dropTempTable(tableName: String): Unit = { - cacheManager.tryUncacheQuery(table(tableName)) - sessionState.catalog.dropTable(TableIdentifier(tableName), ignoreIfNotExists = true) - } + @transient lazy val catalog: Catalog = new CatalogImpl(self) /** * Returns the specified table as a [[DataFrame]]. @@ -749,55 +571,11 @@ class SparkSession private( table(sessionState.sqlParser.parseTableIdentifier(tableName)) } - private def table(tableIdent: TableIdentifier): DataFrame = { + protected[sql] def table(tableIdent: TableIdentifier): DataFrame = { Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent)) } /** - * Returns a [[DataFrame]] containing names of existing tables in the current database. - * The returned DataFrame has two columns, tableName and isTemporary (a Boolean - * indicating if a table is a temporary one or not). - * - * @group ddl_ops - * @since 2.0.0 - */ - def tables(): DataFrame = { - Dataset.ofRows(self, ShowTablesCommand(None, None)) - } - - /** - * Returns a [[DataFrame]] containing names of existing tables in the given database. - * The returned DataFrame has two columns, tableName and isTemporary (a Boolean - * indicating if a table is a temporary one or not). - * - * @group ddl_ops - * @since 2.0.0 - */ - def tables(databaseName: String): DataFrame = { - Dataset.ofRows(self, ShowTablesCommand(Some(databaseName), None)) - } - - /** - * Returns the names of tables in the current database as an array. - * - * @group ddl_ops - * @since 2.0.0 - */ - def tableNames(): Array[String] = { - tableNames(sessionState.catalog.getCurrentDatabase) - } - - /** - * Returns the names of tables in the given database as an array. - * - * @group ddl_ops - * @since 2.0.0 - */ - def tableNames(databaseName: String): Array[String] = { - sessionState.catalog.listTables(databaseName).map(_.table).toArray - } - - /** * Registers the given [[DataFrame]] as a temporary table in the catalog. * Temporary tables exist only during the lifetime of this instance of [[SparkSession]]. */ http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala new file mode 100644 index 0000000..868cc3a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset} +import org.apache.spark.sql.types.StructType + + +/** + * Catalog interface for Spark. To access this, use `SparkSession.catalog`. + */ +abstract class Catalog { + + /** + * Returns the current default database in this session. + * + * @since 2.0.0 + */ + def currentDatabase: String + + /** + * Sets the current default database in this session. + * + * @since 2.0.0 + */ + def setCurrentDatabase(dbName: String): Unit + + /** + * Returns a list of databases available across all sessions. + * + * @since 2.0.0 + */ + def listDatabases(): Dataset[Database] + + /** + * Returns a list of tables in the current database. + * This includes all temporary tables. + * + * @since 2.0.0 + */ + def listTables(): Dataset[Table] + + /** + * Returns a list of tables in the specified database. + * This includes all temporary tables. + * + * @since 2.0.0 + */ + @throws[AnalysisException]("database does not exist") + def listTables(dbName: String): Dataset[Table] + + /** + * Returns a list of functions registered in the current database. + * This includes all temporary functions + * + * @since 2.0.0 + */ + def listFunctions(): Dataset[Function] + + /** + * Returns a list of functions registered in the specified database. + * This includes all temporary functions + * + * @since 2.0.0 + */ + @throws[AnalysisException]("database does not exist") + def listFunctions(dbName: String): Dataset[Function] + + /** + * Returns a list of columns for the given table in the current database. + * + * @since 2.0.0 + */ + @throws[AnalysisException]("table does not exist") + def listColumns(tableName: String): Dataset[Column] + + /** + * Returns a list of columns for the given table in the specified database. + * + * @since 2.0.0 + */ + @throws[AnalysisException]("database or table does not exist") + def listColumns(dbName: String, tableName: String): Dataset[Column] + + /** + * :: Experimental :: + * Creates an external table from the given path and returns the corresponding DataFrame. + * It will use the default data source configured by spark.sql.sources.default. + * + * @since 2.0.0 + */ + @Experimental + def createExternalTable(tableName: String, path: String): DataFrame + + /** + * :: Experimental :: + * Creates an external table from the given path based on a data source + * and returns the corresponding DataFrame. + * + * @since 2.0.0 + */ + @Experimental + def createExternalTable(tableName: String, path: String, source: String): DataFrame + + /** + * :: Experimental :: + * Creates an external table from the given path based on a data source and a set of options. + * Then, returns the corresponding DataFrame. + * + * @since 2.0.0 + */ + @Experimental + def createExternalTable( + tableName: String, + source: String, + options: java.util.Map[String, String]): DataFrame + + /** + * :: Experimental :: + * (Scala-specific) + * Creates an external table from the given path based on a data source and a set of options. + * Then, returns the corresponding DataFrame. + * + * @since 2.0.0 + */ + @Experimental + def createExternalTable( + tableName: String, + source: String, + options: Map[String, String]): DataFrame + + /** + * :: Experimental :: + * Create an external table from the given path based on a data source, a schema and + * a set of options. Then, returns the corresponding DataFrame. + * + * @since 2.0.0 + */ + @Experimental + def createExternalTable( + tableName: String, + source: String, + schema: StructType, + options: java.util.Map[String, String]): DataFrame + + /** + * :: Experimental :: + * (Scala-specific) + * Create an external table from the given path based on a data source, a schema and + * a set of options. Then, returns the corresponding DataFrame. + * + * @since 2.0.0 + */ + @Experimental + def createExternalTable( + tableName: String, + source: String, + schema: StructType, + options: Map[String, String]): DataFrame + + /** + * Drops the temporary table with the given table name in the catalog. + * If the table has been cached/persisted before, it's also unpersisted. + * + * @param tableName the name of the table to be unregistered. + * @since 2.0.0 + */ + def dropTempTable(tableName: String): Unit + + /** + * Returns true if the table is currently cached in-memory. + * + * @since 2.0.0 + */ + def isCached(tableName: String): Boolean + + /** + * Caches the specified table in-memory. + * + * @since 2.0.0 + */ + def cacheTable(tableName: String): Unit + + /** + * Removes the specified table from the in-memory cache. + * + * @since 2.0.0 + */ + def uncacheTable(tableName: String): Unit + + /** + * Removes all cached tables from the in-memory cache. + * + * @since 2.0.0 + */ + def clearCache(): Unit + +} http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala new file mode 100644 index 0000000..d5de6cd --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog + +import javax.annotation.Nullable + +import org.apache.spark.sql.catalyst.DefinedByConstructorParams + + +// Note: all classes here are expected to be wrapped in Datasets and so must extend +// DefinedByConstructorParams for the catalog to be able to create encoders for them. + +class Database( + val name: String, + @Nullable val description: String, + val locationUri: String) + extends DefinedByConstructorParams { + + override def toString: String = { + "Database[" + + s"name='$name', " + + Option(description).map { d => s"description='$d', " }.getOrElse("") + + s"path='$locationUri']" + } + +} + + +class Table( + val name: String, + @Nullable val database: String, + @Nullable val description: String, + val tableType: String, + val isTemporary: Boolean) + extends DefinedByConstructorParams { + + override def toString: String = { + "Table[" + + s"name='$name', " + + Option(database).map { d => s"database='$d', " }.getOrElse("") + + Option(description).map { d => s"description='$d', " }.getOrElse("") + + s"tableType='$tableType', " + + s"isTemporary='$isTemporary']" + } + +} + + +class Column( + val name: String, + @Nullable val description: String, + val dataType: String, + val nullable: Boolean, + val isPartition: Boolean, + val isBucket: Boolean) + extends DefinedByConstructorParams { + + override def toString: String = { + "Column[" + + s"name='$name', " + + Option(description).map { d => s"description='$d', " }.getOrElse("") + + s"dataType='$dataType', " + + s"nullable='$nullable', " + + s"isPartition='$isPartition', " + + s"isBucket='$isBucket']" + } + +} + + +class Function( + val name: String, + @Nullable val description: String, + val className: String, + val isTemporary: Boolean) + extends DefinedByConstructorParams { + + override def toString: String = { + "Function[" + + s"name='$name', " + + Option(description).map { d => s"description='$d', " }.getOrElse("") + + s"className='$className', " + + s"isTemporary='$isTemporary']" + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index ebc60ed..e04e130 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -835,9 +835,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { throw new ParseException("Operation not allowed: CREATE TABLE ... CLUSTERED BY ...", ctx) } val tableType = if (external) { - CatalogTableType.EXTERNAL_TABLE + CatalogTableType.EXTERNAL } else { - CatalogTableType.MANAGED_TABLE + CatalogTableType.MANAGED } val comment = Option(ctx.STRING).map(string) val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns) @@ -1083,7 +1083,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val sql = Option(source(query)) val tableDesc = CatalogTable( identifier = visitTableIdentifier(name), - tableType = CatalogTableType.VIRTUAL_VIEW, + tableType = CatalogTableType.VIEW, schema = schema, storage = EmptyStorageFormat, properties = properties, http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index c283bd6..ec3fada 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -32,7 +32,7 @@ case class CacheTableCommand( plan.foreach { logicalPlan => sparkSession.registerDataFrameAsTable(Dataset.ofRows(sparkSession, logicalPlan), tableName) } - sparkSession.cacheTable(tableName) + sparkSession.catalog.cacheTable(tableName) if (!isLazy) { // Performs eager caching @@ -62,7 +62,7 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand { case object ClearCacheCommand extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - sparkSession.clearCache() + sparkSession.catalog.clearCache() Seq.empty[Row] } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
