Repository: spark
Updated Branches:
  refs/heads/branch-2.0 cf156e611 -> cb090df74


[SPARK-15185][SQL] InMemoryCatalog: Silent Removal of an Existent 
Table/Function/Partitions by Rename

#### What changes were proposed in this pull request?
So far, in the implementation of InMemoryCatalog, we do not check if the 
new/destination table/function/partition exists or not. Thus, we just silently 
remove the existent table/function/partition.

This PR is to detect them and issue an appropriate exception.

#### How was this patch tested?
Added the related test cases. They also verify if HiveExternalCatalog also 
detects these errors.

Author: gatorsmile <[email protected]>

Closes #12960 from gatorsmile/renameInMemoryCatalog.

(cherry picked from commit e9131ec277731de4a73026f2fb4559182c236f84)
Signed-off-by: Wenchen Fan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb090df7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb090df7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb090df7

Branch: refs/heads/branch-2.0
Commit: cb090df74c11dff87ab3d1b1a0644ffdc84877d0
Parents: cf156e6
Author: gatorsmile <[email protected]>
Authored: Mon May 9 12:40:30 2016 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Mon May 9 12:40:55 2016 +0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/InMemoryCatalog.scala  | 45 +++++++++++++++++---
 .../catalyst/catalog/ExternalCatalogSuite.scala | 22 ++++++++++
 2 files changed, 62 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cb090df7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index c65f461..676a9e1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -59,6 +59,13 @@ class InMemoryCatalog extends ExternalCatalog {
     }
   }
 
+  private def requireFunctionNotExists(db: String, funcName: String): Unit = {
+    if (functionExists(db, funcName)) {
+      throw new AnalysisException(
+        s"Function already exists: '$funcName' exists in database '$db'")
+    }
+  }
+
   private def requireTableExists(db: String, table: String): Unit = {
     if (!tableExists(db, table)) {
       throw new AnalysisException(
@@ -66,10 +73,34 @@ class InMemoryCatalog extends ExternalCatalog {
     }
   }
 
-  private def requirePartitionExists(db: String, table: String, spec: 
TablePartitionSpec): Unit = {
-    if (!partitionExists(db, table, spec)) {
+  private def requireTableNotExists(db: String, table: String): Unit = {
+    if (tableExists(db, table)) {
       throw new AnalysisException(
-        s"Partition not found: database '$db' table '$table' does not contain: 
'$spec'")
+        s"Table or view exists: '$table' exists in database '$db'")
+    }
+  }
+
+  private def requirePartitionsExist(
+      db: String,
+      table: String,
+      specs: Seq[TablePartitionSpec]): Unit = {
+    specs foreach { s =>
+      if (!partitionExists(db, table, s)) {
+        throw new AnalysisException(
+          s"Partition not found: database '$db' table '$table' does not 
contain: '$s'")
+      }
+    }
+  }
+
+  private def requirePartitionsNotExist(
+      db: String,
+      table: String,
+      specs: Seq[TablePartitionSpec]): Unit = {
+    specs foreach { s =>
+      if (partitionExists(db, table, s)) {
+        throw new AnalysisException(
+          s"Partition exists: database '$db' table '$table' already contains: 
'$s'")
+      }
     }
   }
 
@@ -171,6 +202,7 @@ class InMemoryCatalog extends ExternalCatalog {
 
   override def renameTable(db: String, oldName: String, newName: String): Unit 
= synchronized {
     requireTableExists(db, oldName)
+    requireTableNotExists(db, newName)
     val oldDesc = catalog(db).tables(oldName)
     oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, 
Some(db)))
     catalog(db).tables.put(newName, oldDesc)
@@ -272,6 +304,8 @@ class InMemoryCatalog extends ExternalCatalog {
       specs: Seq[TablePartitionSpec],
       newSpecs: Seq[TablePartitionSpec]): Unit = synchronized {
     require(specs.size == newSpecs.size, "number of old and new partition 
specs differ")
+    requirePartitionsExist(db, table, specs)
+    requirePartitionsNotExist(db, table, newSpecs)
     specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
       val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec)
       val existingParts = catalog(db).tables(table).partitions
@@ -284,8 +318,8 @@ class InMemoryCatalog extends ExternalCatalog {
       db: String,
       table: String,
       parts: Seq[CatalogTablePartition]): Unit = synchronized {
+    requirePartitionsExist(db, table, parts.map(p => p.spec))
     parts.foreach { p =>
-      requirePartitionExists(db, table, p.spec)
       catalog(db).tables(table).partitions.put(p.spec, p)
     }
   }
@@ -294,7 +328,7 @@ class InMemoryCatalog extends ExternalCatalog {
       db: String,
       table: String,
       spec: TablePartitionSpec): CatalogTablePartition = synchronized {
-    requirePartitionExists(db, table, spec)
+    requirePartitionsExist(db, table, Seq(spec))
     catalog(db).tables(table).partitions(spec)
   }
 
@@ -330,6 +364,7 @@ class InMemoryCatalog extends ExternalCatalog {
 
   override def renameFunction(db: String, oldName: String, newName: String): 
Unit = synchronized {
     requireFunctionExists(db, oldName)
+    requireFunctionNotExists(db, newName)
     val newFunc = getFunction(db, oldName).copy(identifier = 
FunctionIdentifier(newName, Some(db)))
     catalog(db).functions.remove(oldName)
     catalog(db).functions.put(newName, newFunc)

http://git-wip-us.apache.org/repos/asf/spark/blob/cb090df7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index ae7c503..e347734 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -198,6 +198,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
     }
   }
 
+  test("rename table when destination table already exists") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.renameTable("db2", "tbl1", "tbl2")
+    }
+  }
+
   test("alter table") {
     val catalog = newBasicCatalog()
     val tbl1 = catalog.getTable("db2", "tbl1")
@@ -356,6 +363,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
     }
   }
 
+  test("rename partitions when the new partition already exists") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.renamePartitions("db2", "tbl2", Seq(part1.spec), Seq(part2.spec))
+    }
+  }
+
   test("alter partitions") {
     val catalog = newBasicCatalog()
     try {
@@ -480,6 +494,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
     }
   }
 
+  test("rename function when new function already exists") {
+    val catalog = newBasicCatalog()
+    catalog.createFunction("db2", newFunc("func2", Some("db2")))
+    intercept[AnalysisException] {
+      catalog.renameFunction("db2", "func1", "func2")
+    }
+  }
+
   test("list functions") {
     val catalog = newBasicCatalog()
     catalog.createFunction("db2", newFunc("func2"))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to