Repository: spark
Updated Branches:
  refs/heads/branch-1.2 09eef3b5c -> c6ea6d4b7


[SPARK-4943][SQL] Allow table name having dot for db/catalog

The pull only fixes the parsing error and changes API to use tableIdentifier. 
Joining different catalog datasource related change is not done in this pull.

Author: Alex Liu <[email protected]>

Closes #3941 from alexliu68/SPARK-SQL-4943-3 and squashes the following commits:

343ae27 [Alex Liu] [SPARK-4943][SQL] refactoring according to review
29e5e55 [Alex Liu] [SPARK-4943][SQL] fix failed Hive CTAS tests
6ae77ce [Alex Liu] [SPARK-4943][SQL] fix TestHive matching error
3652997 [Alex Liu] [SPARK-4943][SQL] Allow table name having dot to support 
db/catalog ...

(cherry picked from commit 4b39fd1e63188821fc84a13f7ccb6e94277f4be7)
Signed-off-by: Michael Armbrust <[email protected]>

Conflicts:
        sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
        
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6ea6d4b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6ea6d4b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6ea6d4b

Branch: refs/heads/branch-1.2
Commit: c6ea6d4b7dfb7cad440025b13a584d5200a4ef8f
Parents: 09eef3b
Author: Alex Liu <[email protected]>
Authored: Sat Jan 10 13:23:09 2015 -0800
Committer: Michael Armbrust <[email protected]>
Committed: Sat Jan 10 13:42:36 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/SqlParser.scala   |   6 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala  |   8 +-
 .../spark/sql/catalyst/analysis/Catalog.scala   | 106 +++++++++----------
 .../sql/catalyst/analysis/unresolved.scala      |   3 +-
 .../apache/spark/sql/catalyst/dsl/package.scala |   2 +-
 .../sql/catalyst/analysis/AnalysisSuite.scala   |  20 ++--
 .../analysis/DecimalPrecisionSuite.scala        |   2 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |   6 +-
 .../org/apache/spark/sql/SchemaRDDLike.scala    |   4 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala  |   4 +-
 .../org/apache/spark/sql/hive/HiveContext.scala |   2 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  52 ++++++---
 .../org/apache/spark/sql/hive/HiveQl.scala      |  29 +++--
 .../org/apache/spark/sql/hive/TestHive.scala    |   2 +-
 .../hive/execution/CreateTableAsSelect.scala    |   4 +-
 .../spark/sql/hive/execution/commands.scala     |   2 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala |   4 +-
 17 files changed, 143 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c6ea6d4b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index a2bcd73..3141942 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -177,10 +177,10 @@ class SqlParser extends AbstractSparkSQLParser {
     joinedRelation | relationFactor
 
   protected lazy val relationFactor: Parser[LogicalPlan] =
-    ( ident ~ (opt(AS) ~> opt(ident)) ^^ {
-        case tableName ~ alias => UnresolvedRelation(None, tableName, alias)
+    ( rep1sep(ident, ".") ~ (opt(AS) ~> opt(ident)) ^^ {
+        case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias)
       }
-    | ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, 
s) }
+      | ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, 
s) }
     )
 
   protected lazy val joinedRelation: Parser[LogicalPlan] =

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ea6d4b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index facbd8b..afa6b17 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -136,11 +136,11 @@ class Analyzer(catalog: Catalog, registry: 
FunctionRegistry, caseSensitive: Bool
    */
   object ResolveRelations extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-      case i @ InsertIntoTable(UnresolvedRelation(databaseName, name, alias), 
_, _, _) =>
+      case i @ InsertIntoTable(UnresolvedRelation(tableIdentifier, alias), _, 
_, _) =>
         i.copy(
-          table = 
EliminateAnalysisOperators(catalog.lookupRelation(databaseName, name, alias)))
-      case UnresolvedRelation(databaseName, name, alias) =>
-        catalog.lookupRelation(databaseName, name, alias)
+          table = 
EliminateAnalysisOperators(catalog.lookupRelation(tableIdentifier, alias)))
+      case UnresolvedRelation(tableIdentifier, alias) =>
+        catalog.lookupRelation(tableIdentifier, alias)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ea6d4b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 0415d74..df8d03b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -28,77 +28,74 @@ trait Catalog {
 
   def caseSensitive: Boolean
 
-  def tableExists(db: Option[String], tableName: String): Boolean
+  def tableExists(tableIdentifier: Seq[String]): Boolean
 
   def lookupRelation(
-    databaseName: Option[String],
-    tableName: String,
-    alias: Option[String] = None): LogicalPlan
+      tableIdentifier: Seq[String],
+      alias: Option[String] = None): LogicalPlan
 
-  def registerTable(databaseName: Option[String], tableName: String, plan: 
LogicalPlan): Unit
+  def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit
 
-  def unregisterTable(databaseName: Option[String], tableName: String): Unit
+  def unregisterTable(tableIdentifier: Seq[String]): Unit
 
   def unregisterAllTables(): Unit
 
-  protected def processDatabaseAndTableName(
-      databaseName: Option[String],
-      tableName: String): (Option[String], String) = {
+  protected def processTableIdentifier(tableIdentifier: Seq[String]): 
Seq[String] = {
     if (!caseSensitive) {
-      (databaseName.map(_.toLowerCase), tableName.toLowerCase)
+      tableIdentifier.map(_.toLowerCase)
     } else {
-      (databaseName, tableName)
+      tableIdentifier
     }
   }
 
-  protected def processDatabaseAndTableName(
-      databaseName: String,
-      tableName: String): (String, String) = {
-    if (!caseSensitive) {
-      (databaseName.toLowerCase, tableName.toLowerCase)
+  protected def getDbTableName(tableIdent: Seq[String]): String = {
+    val size = tableIdent.size
+    if (size <= 2) {
+      tableIdent.mkString(".")
     } else {
-      (databaseName, tableName)
+      tableIdent.slice(size - 2, size).mkString(".")
     }
   }
+
+  protected def getDBTable(tableIdent: Seq[String]) : (Option[String], String) 
= {
+    (tableIdent.lift(tableIdent.size - 2), tableIdent.last)
+  }
 }
 
 class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
   val tables = new mutable.HashMap[String, LogicalPlan]()
 
   override def registerTable(
-      databaseName: Option[String],
-      tableName: String,
+      tableIdentifier: Seq[String],
       plan: LogicalPlan): Unit = {
-    val (dbName, tblName) = processDatabaseAndTableName(databaseName, 
tableName)
-    tables += ((tblName, plan))
+    val tableIdent = processTableIdentifier(tableIdentifier)
+    tables += ((getDbTableName(tableIdent), plan))
   }
 
-  override def unregisterTable(
-      databaseName: Option[String],
-      tableName: String) = {
-    val (dbName, tblName) = processDatabaseAndTableName(databaseName, 
tableName)
-    tables -= tblName
+  override def unregisterTable(tableIdentifier: Seq[String]) = {
+    val tableIdent = processTableIdentifier(tableIdentifier)
+    tables -= getDbTableName(tableIdent)
   }
 
   override def unregisterAllTables() = {
     tables.clear()
   }
 
-  override def tableExists(db: Option[String], tableName: String): Boolean = {
-    val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
-    tables.get(tblName) match {
+  override def tableExists(tableIdentifier: Seq[String]): Boolean = {
+    val tableIdent = processTableIdentifier(tableIdentifier)
+    tables.get(getDbTableName(tableIdent)) match {
       case Some(_) => true
       case None => false
     }
   }
 
   override def lookupRelation(
-      databaseName: Option[String],
-      tableName: String,
+      tableIdentifier: Seq[String],
       alias: Option[String] = None): LogicalPlan = {
-    val (dbName, tblName) = processDatabaseAndTableName(databaseName, 
tableName)
-    val table = tables.getOrElse(tblName, sys.error(s"Table Not Found: 
$tableName"))
-    val tableWithQualifiers = Subquery(tblName, table)
+    val tableIdent = processTableIdentifier(tableIdentifier)
+    val tableFullName = getDbTableName(tableIdent)
+    val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: 
$tableFullName"))
+    val tableWithQualifiers = Subquery(tableIdent.last, table)
 
     // If an alias was specified by the lookup, wrap the plan in a subquery so 
that attributes are
     // properly qualified with this alias.
@@ -117,41 +114,39 @@ trait OverrideCatalog extends Catalog {
   // TODO: This doesn't work when the database changes...
   val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]()
 
-  abstract override def tableExists(db: Option[String], tableName: String): 
Boolean = {
-    val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
-    overrides.get((dbName, tblName)) match {
+  abstract override def tableExists(tableIdentifier: Seq[String]): Boolean = {
+    val tableIdent = processTableIdentifier(tableIdentifier)
+    overrides.get(getDBTable(tableIdent)) match {
       case Some(_) => true
-      case None => super.tableExists(db, tableName)
+      case None => super.tableExists(tableIdentifier)
     }
   }
 
   abstract override def lookupRelation(
-    databaseName: Option[String],
-    tableName: String,
+    tableIdentifier: Seq[String],
     alias: Option[String] = None): LogicalPlan = {
-    val (dbName, tblName) = processDatabaseAndTableName(databaseName, 
tableName)
-    val overriddenTable = overrides.get((dbName, tblName))
-    val tableWithQualifers = overriddenTable.map(r => Subquery(tblName, r))
+    val tableIdent = processTableIdentifier(tableIdentifier)
+    val overriddenTable = overrides.get(getDBTable(tableIdent))
+    val tableWithQualifers = overriddenTable.map(r => 
Subquery(tableIdent.last, r))
 
     // If an alias was specified by the lookup, wrap the plan in a subquery so 
that attributes are
     // properly qualified with this alias.
     val withAlias =
       tableWithQualifers.map(r => alias.map(a => Subquery(a, r)).getOrElse(r))
 
-    withAlias.getOrElse(super.lookupRelation(dbName, tblName, alias))
+    withAlias.getOrElse(super.lookupRelation(tableIdentifier, alias))
   }
 
   override def registerTable(
-      databaseName: Option[String],
-      tableName: String,
+      tableIdentifier: Seq[String],
       plan: LogicalPlan): Unit = {
-    val (dbName, tblName) = processDatabaseAndTableName(databaseName, 
tableName)
-    overrides.put((dbName, tblName), plan)
+    val tableIdent = processTableIdentifier(tableIdentifier)
+    overrides.put(getDBTable(tableIdent), plan)
   }
 
-  override def unregisterTable(databaseName: Option[String], tableName: 
String): Unit = {
-    val (dbName, tblName) = processDatabaseAndTableName(databaseName, 
tableName)
-    overrides.remove((dbName, tblName))
+  override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
+    val tableIdent = processTableIdentifier(tableIdentifier)
+    overrides.remove(getDBTable(tableIdent))
   }
 
   override def unregisterAllTables(): Unit = {
@@ -167,22 +162,21 @@ object EmptyCatalog extends Catalog {
 
   val caseSensitive: Boolean = true
 
-  def tableExists(db: Option[String], tableName: String): Boolean = {
+  def tableExists(tableIdentifier: Seq[String]): Boolean = {
     throw new UnsupportedOperationException
   }
 
   def lookupRelation(
-    databaseName: Option[String],
-    tableName: String,
+    tableIdentifier: Seq[String],
     alias: Option[String] = None) = {
     throw new UnsupportedOperationException
   }
 
-  def registerTable(databaseName: Option[String], tableName: String, plan: 
LogicalPlan): Unit = {
+  def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = {
     throw new UnsupportedOperationException
   }
 
-  def unregisterTable(databaseName: Option[String], tableName: String): Unit = 
{
+  def unregisterTable(tableIdentifier: Seq[String]): Unit = {
     throw new UnsupportedOperationException
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ea6d4b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 77d84e1..71a738a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -34,8 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: 
TreeType, function: Str
  * Holds the name of a relation that has yet to be looked up in a [[Catalog]].
  */
 case class UnresolvedRelation(
-    databaseName: Option[String],
-    tableName: String,
+    tableIdentifier: Seq[String],
     alias: Option[String] = None) extends LeafNode {
   override def output = Nil
   override lazy val resolved = false

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ea6d4b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 70dabc4..c07b7ec 100755
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -279,7 +279,7 @@ package object dsl {
 
     def insertInto(tableName: String, overwrite: Boolean = false) =
       InsertIntoTable(
-        analysis.UnresolvedRelation(None, tableName), Map.empty, logicalPlan, 
overwrite)
+        analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, 
overwrite)
 
     def analyze = analysis.SimpleAnalyzer(logicalPlan)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ea6d4b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 33a3cba..a8bc773 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -42,8 +42,8 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
     AttributeReference("e", ShortType)())
 
   before {
-    caseSensitiveCatalog.registerTable(None, "TaBlE", testRelation)
-    caseInsensitiveCatalog.registerTable(None, "TaBlE", testRelation)
+    caseSensitiveCatalog.registerTable(Seq("TaBlE"), testRelation)
+    caseInsensitiveCatalog.registerTable(Seq("TaBlE"), testRelation)
   }
 
   test("analyze project") {
@@ -54,45 +54,45 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
     assert(
       caseSensitiveAnalyze(
         Project(Seq(UnresolvedAttribute("TbL.a")),
-          UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
+          UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) ===
         Project(testRelation.output, testRelation))
 
     val e = intercept[TreeNodeException[_]] {
       caseSensitiveAnalyze(
         Project(Seq(UnresolvedAttribute("tBl.a")),
-          UnresolvedRelation(None, "TaBlE", Some("TbL"))))
+          UnresolvedRelation(Seq("TaBlE"), Some("TbL"))))
     }
     assert(e.getMessage().toLowerCase.contains("unresolved"))
 
     assert(
       caseInsensitiveAnalyze(
         Project(Seq(UnresolvedAttribute("TbL.a")),
-          UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
+          UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) ===
         Project(testRelation.output, testRelation))
 
     assert(
       caseInsensitiveAnalyze(
         Project(Seq(UnresolvedAttribute("tBl.a")),
-          UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
+          UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) ===
         Project(testRelation.output, testRelation))
   }
 
   test("resolve relations") {
     val e = intercept[RuntimeException] {
-      caseSensitiveAnalyze(UnresolvedRelation(None, "tAbLe", None))
+      caseSensitiveAnalyze(UnresolvedRelation(Seq("tAbLe"), None))
     }
     assert(e.getMessage == "Table Not Found: tAbLe")
 
     assert(
-      caseSensitiveAnalyze(UnresolvedRelation(None, "TaBlE", None)) ===
+      caseSensitiveAnalyze(UnresolvedRelation(Seq("TaBlE"), None)) ===
         testRelation)
 
     assert(
-      caseInsensitiveAnalyze(UnresolvedRelation(None, "tAbLe", None)) ===
+      caseInsensitiveAnalyze(UnresolvedRelation(Seq("tAbLe"), None)) ===
         testRelation)
 
     assert(
-      caseInsensitiveAnalyze(UnresolvedRelation(None, "TaBlE", None)) ===
+      caseInsensitiveAnalyze(UnresolvedRelation(Seq("TaBlE"), None)) ===
         testRelation)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ea6d4b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index d5b7d27..fc7a001 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -41,7 +41,7 @@ class DecimalPrecisionSuite extends FunSuite with 
BeforeAndAfter {
   val f: Expression = UnresolvedAttribute("f")
 
   before {
-    catalog.registerTable(None, "table", relation)
+    catalog.registerTable(Seq("table"), relation)
   }
 
   private def checkType(expression: Expression, expectedType: DataType): Unit 
= {

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ea6d4b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index ebd4cc9..832d5b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -276,7 +276,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * @group userf
    */
   def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
-    catalog.registerTable(None, tableName, rdd.queryExecution.logical)
+    catalog.registerTable(Seq(tableName), rdd.queryExecution.logical)
   }
 
   /**
@@ -289,7 +289,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   def dropTempTable(tableName: String): Unit = {
     tryUncacheQuery(table(tableName))
-    catalog.unregisterTable(None, tableName)
+    catalog.unregisterTable(Seq(tableName))
   }
 
   /**
@@ -308,7 +308,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   /** Returns the specified table as a SchemaRDD */
   def table(tableName: String): SchemaRDD =
-    new SchemaRDD(this, catalog.lookupRelation(None, tableName))
+    new SchemaRDD(this, catalog.lookupRelation(Seq(tableName)))
 
   /**
    * :: DeveloperApi ::

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ea6d4b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index fd5f4ab..3cf9209 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -97,8 +97,8 @@ private[sql] trait SchemaRDDLike {
    */
   @Experimental
   def insertInto(tableName: String, overwrite: Boolean): Unit =
-    sqlContext.executePlan(
-      InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, 
logicalPlan, overwrite)).toRdd
+    sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(Seq(tableName)),
+      Map.empty, logicalPlan, overwrite)).toRdd
 
   /**
    * :: Experimental ::

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ea6d4b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 8b4cf5b..1dcbaf5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -296,8 +296,8 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
     upperCaseData.where('N <= 4).registerTempTable("left")
     upperCaseData.where('N >= 3).registerTempTable("right")
 
-    val left = UnresolvedRelation(None, "left", None)
-    val right = UnresolvedRelation(None, "right", None)
+    val left = UnresolvedRelation(Seq("left"), None)
+    val right = UnresolvedRelation(Seq("right"), None)
 
     checkAnswer(
       left.join(right, FullOuter, Some("left.N".attr === "right.N".attr)),

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ea6d4b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index e006d3b..4a6d7d0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -124,7 +124,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
    * in the Hive metastore.
    */
   def analyze(tableName: String) {
-    val relation = EliminateAnalysisOperators(catalog.lookupRelation(None, 
tableName))
+    val relation = 
EliminateAnalysisOperators(catalog.lookupRelation(Seq(tableName)))
 
     relation match {
       case relation: MetastoreRelation =>

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ea6d4b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d8b10b7..bbf6752 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.TableType
 import org.apache.hadoop.hive.metastore.api.FieldSchema
 import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => 
TPartition}
 import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, 
HiveException}
+import org.apache.hadoop.hive.ql.metadata.InvalidTableException
 import org.apache.hadoop.hive.ql.plan.CreateTableDesc
 import org.apache.hadoop.hive.serde.serdeConstants
 import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
@@ -55,18 +56,25 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) 
extends Catalog with
 
   val caseSensitive: Boolean = false
 
-  def tableExists(db: Option[String], tableName: String): Boolean = {
-    val (databaseName, tblName) = processDatabaseAndTableName(
-      db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
-    client.getTable(databaseName, tblName, false) != null
+  def tableExists(tableIdentifier: Seq[String]): Boolean = {
+    val tableIdent = processTableIdentifier(tableIdentifier)
+    val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
+      hive.sessionState.getCurrentDatabase)
+    val tblName = tableIdent.last
+    try {
+      client.getTable(databaseName, tblName) != null
+    } catch {
+      case ie: InvalidTableException => false
+    }
   }
 
   def lookupRelation(
-      db: Option[String],
-      tableName: String,
+      tableIdentifier: Seq[String],
       alias: Option[String]): LogicalPlan = synchronized {
-    val (databaseName, tblName) =
-      
processDatabaseAndTableName(db.getOrElse(hive.sessionState.getCurrentDatabase), 
tableName)
+    val tableIdent = processTableIdentifier(tableIdentifier)
+    val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
+      hive.sessionState.getCurrentDatabase)
+    val tblName = tableIdent.last
     val table = client.getTable(databaseName, tblName)
     if (table.isView) {
       // if the unresolved relation is from hive view
@@ -249,6 +257,26 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
     }
   }
 
+  protected def processDatabaseAndTableName(
+      databaseName: Option[String],
+      tableName: String): (Option[String], String) = {
+    if (!caseSensitive) {
+      (databaseName.map(_.toLowerCase), tableName.toLowerCase)
+    } else {
+      (databaseName, tableName)
+    }
+  }
+
+  protected def processDatabaseAndTableName(
+      databaseName: String,
+      tableName: String): (String, String) = {
+    if (!caseSensitive) {
+      (databaseName.toLowerCase, tableName.toLowerCase)
+    } else {
+      (databaseName, tableName)
+    }
+  }
+
   /**
    * Creates any tables required for query execution.
    * For example, because of a CREATE TABLE X AS statement.
@@ -268,7 +296,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) 
extends Catalog with
         val databaseName = 
dbName.getOrElse(hive.sessionState.getCurrentDatabase)
 
         // Get the CreateTableDesc from Hive SemanticAnalyzer
-        val desc: Option[CreateTableDesc] = if 
(tableExists(Some(databaseName), tblName)) {
+        val desc: Option[CreateTableDesc] = if (tableExists(Seq(databaseName, 
tblName))) {
           None
         } else {
           val sa = new SemanticAnalyzer(hive.hiveconf) {
@@ -340,15 +368,13 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
    * UNIMPLEMENTED: It needs to be decided how we will persist in-memory 
tables to the metastore.
    * For now, if this functionality is desired mix in the in-memory 
[[OverrideCatalog]].
    */
-  override def registerTable(
-      databaseName: Option[String], tableName: String, plan: LogicalPlan): 
Unit = ???
+  override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): 
Unit = ???
 
   /**
    * UNIMPLEMENTED: It needs to be decided how we will persist in-memory 
tables to the metastore.
    * For now, if this functionality is desired mix in the in-memory 
[[OverrideCatalog]].
    */
-  override def unregisterTable(
-      databaseName: Option[String], tableName: String): Unit = ???
+  override def unregisterTable(tableIdentifier: Seq[String]): Unit = ???
 
   override def unregisterAllTables() = {}
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ea6d4b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index cd4e5a2..3d39e6c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -393,6 +393,15 @@ private[hive] object HiveQl {
     (db, tableName)
   }
 
+  protected def extractTableIdent(tableNameParts: Node): Seq[String] = {
+    tableNameParts.getChildren.map { case Token(part, Nil) => 
cleanIdentifier(part) } match {
+      case Seq(tableOnly) => Seq(tableOnly)
+      case Seq(databaseName, table) => Seq(databaseName, table)
+      case other => sys.error("Hive only supports tables names like 
'tableName' " +
+        s"or 'databaseName.tableName', found '$other'")
+    }
+  }
+
   protected def nodeToPlan(node: Node): LogicalPlan = node match {
     // Special drop table that also uncaches.
     case Token("TOK_DROPTABLE",
@@ -446,16 +455,16 @@ private[hive] object HiveQl {
               case Token(".", dbName :: tableName :: Nil) =>
                 // It is describing a table with the format like "describe 
db.table".
                 // TODO: Actually, a user may mean tableName.columnName. Need 
to resolve this issue.
-                val (db, tableName) = extractDbNameTableName(nameParts.head)
+                val tableIdent = extractTableIdent(nameParts.head)
                 DescribeCommand(
-                  UnresolvedRelation(db, tableName, None), extended.isDefined)
+                  UnresolvedRelation(tableIdent, None), extended.isDefined)
               case Token(".", dbName :: tableName :: colName :: Nil) =>
                 // It is describing a column with the format like "describe 
db.table column".
                 NativePlaceholder
               case tableName =>
                 // It is describing a table with the format like "describe 
table".
                 DescribeCommand(
-                  UnresolvedRelation(None, tableName.getText, None),
+                  UnresolvedRelation(Seq(tableName.getText), None),
                   extended.isDefined)
             }
           }
@@ -705,13 +714,15 @@ private[hive] object HiveQl {
           nonAliasClauses)
       }
 
-      val (db, tableName) =
+      val tableIdent =
         tableNameParts.getChildren.map{ case Token(part, Nil) => 
cleanIdentifier(part)} match {
-          case Seq(tableOnly) => (None, tableOnly)
-          case Seq(databaseName, table) => (Some(databaseName), table)
+          case Seq(tableOnly) => Seq(tableOnly)
+          case Seq(databaseName, table) => Seq(databaseName, table)
+          case other => sys.error("Hive only supports tables names like 
'tableName' " +
+            s"or 'databaseName.tableName', found '$other'")
       }
       val alias = aliasClause.map { case Token(a, Nil) => cleanIdentifier(a) }
-      val relation = UnresolvedRelation(db, tableName, alias)
+      val relation = UnresolvedRelation(tableIdent, alias)
 
       // Apply sampling if requested.
       (bucketSampleClause orElse splitSampleClause).map {
@@ -830,7 +841,7 @@ private[hive] object HiveQl {
       val Some(tableNameParts) :: partitionClause :: Nil =
         getClauses(Seq("TOK_TABNAME", "TOK_PARTSPEC"), tableArgs)
 
-      val (db, tableName) = extractDbNameTableName(tableNameParts)
+      val tableIdent = extractTableIdent(tableNameParts)
 
       val partitionKeys = partitionClause.map(_.getChildren.map {
         // Parse partitions. We also make keys case insensitive.
@@ -840,7 +851,7 @@ private[hive] object HiveQl {
           cleanIdentifier(key.toLowerCase) -> None
       }.toMap).getOrElse(Map.empty)
 
-      InsertIntoTable(UnresolvedRelation(db, tableName, None), partitionKeys, 
query, overwrite)
+      InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, 
query, overwrite)
 
     case a: ASTNode =>
       throw new NotImplementedError(s"No parse rules for:\n 
${dumpTree(a).toString} ")

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ea6d4b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index bb79ad5..53b6487 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -169,7 +169,7 @@ class TestHiveContext(sc: SparkContext) extends 
HiveContext(sc) {
       // Make sure any test tables referenced are loaded.
       val referencedTables =
         describedTables ++
-        logical.collect { case UnresolvedRelation(databaseName, name, _) => 
name }
+        logical.collect { case UnresolvedRelation(tableIdent, _) => 
tableIdent.last }
       val referencedTestTables = referencedTables.filter(testTables.contains)
       logDebug(s"Query references test tables: 
${referencedTestTables.mkString(", ")}")
       referencedTestTables.foreach(loadTestTable)

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ea6d4b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index b83689c..cd3c440 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -56,7 +56,7 @@ case class CreateTableAsSelect(
     sc.catalog.createTable(database, tableName, query.output, allowExisting, 
desc)
 
     // Get the Metastore Relation
-    sc.catalog.lookupRelation(Some(database), tableName, None) match {
+    sc.catalog.lookupRelation(Seq(database, tableName), None) match {
       case r: MetastoreRelation => r
     }
   }
@@ -65,7 +65,7 @@ case class CreateTableAsSelect(
     // TODO ideally, we should get the output data ready first and then
     // add the relation into catalog, just in case of failure occurs while data
     // processing.
-    if (sc.catalog.tableExists(Some(database), tableName)) {
+    if (sc.catalog.tableExists(Seq(database, tableName))) {
       if (allowExisting) {
         // table already exists, will do nothing, to keep consistent with Hive
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ea6d4b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 903075e..a42a7c8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -56,7 +56,7 @@ case class DropTable(tableName: String, ifExists: Boolean) 
extends LeafNode with
   override protected lazy val sideEffectResult: Seq[Row] = {
     val ifExistsClause = if (ifExists) "IF EXISTS " else ""
     hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
-    hiveContext.catalog.unregisterTable(None, tableName)
+    hiveContext.catalog.unregisterTable(Seq(tableName))
     Seq.empty[Row]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ea6d4b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index a90fc02..f5a3192 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -72,7 +72,7 @@ class StatisticsSuite extends QueryTest with 
BeforeAndAfterAll {
 
   test("analyze MetastoreRelations") {
     def queryTotalSize(tableName: String): BigInt =
-      catalog.lookupRelation(None, tableName).statistics.sizeInBytes
+      catalog.lookupRelation(Seq(tableName)).statistics.sizeInBytes
 
     // Non-partitioned table
     sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
@@ -123,7 +123,7 @@ class StatisticsSuite extends QueryTest with 
BeforeAndAfterAll {
     intercept[NotImplementedError] {
       analyze("tempTable")
     }
-    catalog.unregisterTable(None, "tempTable")
+    catalog.unregisterTable(Seq("tempTable"))
   }
 
   test("estimates the size of a test MetastoreRelation") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to