Repository: spark Updated Branches: refs/heads/branch-2.0 cf156e611 -> cb090df74
[SPARK-15185][SQL] InMemoryCatalog: Silent Removal of an Existent Table/Function/Partitions by Rename #### What changes were proposed in this pull request? So far, in the implementation of InMemoryCatalog, we do not check if the new/destination table/function/partition exists or not. Thus, we just silently remove the existent table/function/partition. This PR is to detect them and issue an appropriate exception. #### How was this patch tested? Added the related test cases. They also verify if HiveExternalCatalog also detects these errors. Author: gatorsmile <[email protected]> Closes #12960 from gatorsmile/renameInMemoryCatalog. (cherry picked from commit e9131ec277731de4a73026f2fb4559182c236f84) Signed-off-by: Wenchen Fan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb090df7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb090df7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb090df7 Branch: refs/heads/branch-2.0 Commit: cb090df74c11dff87ab3d1b1a0644ffdc84877d0 Parents: cf156e6 Author: gatorsmile <[email protected]> Authored: Mon May 9 12:40:30 2016 +0800 Committer: Wenchen Fan <[email protected]> Committed: Mon May 9 12:40:55 2016 +0800 ---------------------------------------------------------------------- .../sql/catalyst/catalog/InMemoryCatalog.scala | 45 +++++++++++++++++--- .../catalyst/catalog/ExternalCatalogSuite.scala | 22 ++++++++++ 2 files changed, 62 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/cb090df7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index c65f461..676a9e1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -59,6 +59,13 @@ class InMemoryCatalog extends ExternalCatalog { } } + private def requireFunctionNotExists(db: String, funcName: String): Unit = { + if (functionExists(db, funcName)) { + throw new AnalysisException( + s"Function already exists: '$funcName' exists in database '$db'") + } + } + private def requireTableExists(db: String, table: String): Unit = { if (!tableExists(db, table)) { throw new AnalysisException( @@ -66,10 +73,34 @@ class InMemoryCatalog extends ExternalCatalog { } } - private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = { - if (!partitionExists(db, table, spec)) { + private def requireTableNotExists(db: String, table: String): Unit = { + if (tableExists(db, table)) { throw new AnalysisException( - s"Partition not found: database '$db' table '$table' does not contain: '$spec'") + s"Table or view exists: '$table' exists in database '$db'") + } + } + + private def requirePartitionsExist( + db: String, + table: String, + specs: Seq[TablePartitionSpec]): Unit = { + specs foreach { s => + if (!partitionExists(db, table, s)) { + throw new AnalysisException( + s"Partition not found: database '$db' table '$table' does not contain: '$s'") + } + } + } + + private def requirePartitionsNotExist( + db: String, + table: String, + specs: Seq[TablePartitionSpec]): Unit = { + specs foreach { s => + if (partitionExists(db, table, s)) { + throw new AnalysisException( + s"Partition exists: database '$db' table '$table' already contains: '$s'") + } } } @@ -171,6 +202,7 @@ class InMemoryCatalog extends ExternalCatalog { override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized { requireTableExists(db, oldName) + requireTableNotExists(db, newName) val oldDesc = catalog(db).tables(oldName) oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db))) catalog(db).tables.put(newName, oldDesc) @@ -272,6 +304,8 @@ class InMemoryCatalog extends ExternalCatalog { specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit = synchronized { require(specs.size == newSpecs.size, "number of old and new partition specs differ") + requirePartitionsExist(db, table, specs) + requirePartitionsNotExist(db, table, newSpecs) specs.zip(newSpecs).foreach { case (oldSpec, newSpec) => val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec) val existingParts = catalog(db).tables(table).partitions @@ -284,8 +318,8 @@ class InMemoryCatalog extends ExternalCatalog { db: String, table: String, parts: Seq[CatalogTablePartition]): Unit = synchronized { + requirePartitionsExist(db, table, parts.map(p => p.spec)) parts.foreach { p => - requirePartitionExists(db, table, p.spec) catalog(db).tables(table).partitions.put(p.spec, p) } } @@ -294,7 +328,7 @@ class InMemoryCatalog extends ExternalCatalog { db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition = synchronized { - requirePartitionExists(db, table, spec) + requirePartitionsExist(db, table, Seq(spec)) catalog(db).tables(table).partitions(spec) } @@ -330,6 +364,7 @@ class InMemoryCatalog extends ExternalCatalog { override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized { requireFunctionExists(db, oldName) + requireFunctionNotExists(db, newName) val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db))) catalog(db).functions.remove(oldName) catalog(db).functions.put(newName, newFunc) http://git-wip-us.apache.org/repos/asf/spark/blob/cb090df7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index ae7c503..e347734 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -198,6 +198,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac } } + test("rename table when destination table already exists") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.renameTable("db2", "tbl1", "tbl2") + } + } + test("alter table") { val catalog = newBasicCatalog() val tbl1 = catalog.getTable("db2", "tbl1") @@ -356,6 +363,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac } } + test("rename partitions when the new partition already exists") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.renamePartitions("db2", "tbl2", Seq(part1.spec), Seq(part2.spec)) + } + } + test("alter partitions") { val catalog = newBasicCatalog() try { @@ -480,6 +494,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac } } + test("rename function when new function already exists") { + val catalog = newBasicCatalog() + catalog.createFunction("db2", newFunc("func2", Some("db2"))) + intercept[AnalysisException] { + catalog.renameFunction("db2", "func1", "func2") + } + } + test("list functions") { val catalog = newBasicCatalog() catalog.createFunction("db2", newFunc("func2")) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
