[SPARK-13080][SQL] Implement new Catalog API using Hive ## What changes were proposed in this pull request?
This is a step towards merging `SQLContext` and `HiveContext`. A new internal Catalog API was introduced in #10982 and extended in #11069. This patch introduces an implementation of this API using `HiveClient`, an existing interface to Hive. It also extends `HiveClient` with additional calls to Hive that are needed to complete the catalog implementation. *Where should I start reviewing?* The new catalog introduced is `HiveCatalog`. This class is relatively simple because it just calls `HiveClientImpl`, where most of the new logic is. I would not start with `HiveClient`, `HiveQl`, or `HiveMetastoreCatalog`, which are modified mainly because of a refactor. *Why is this patch so big?* I had to refactor HiveClient to remove an intermediate representation of databases, tables, partitions etc. After this refactor `CatalogTable` convert directly to and from `HiveTable` (etc.). Otherwise we would have to first convert `CatalogTable` to the intermediate representation and then convert that to HiveTable, which is messy. The new class hierarchy is as follows: ``` org.apache.spark.sql.catalyst.catalog.Catalog - org.apache.spark.sql.catalyst.catalog.InMemoryCatalog - org.apache.spark.sql.hive.HiveCatalog ``` Note that, as of this patch, none of these classes are currently used anywhere yet. This will come in the future before the Spark 2.0 release. ## How was the this patch tested? All existing unit tests, and HiveCatalogSuite that extends CatalogTestCases. Author: Andrew Or <[email protected]> Author: Reynold Xin <[email protected]> Closes #11293 from rxin/hive-catalog. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c3832b2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c3832b2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c3832b2 Branch: refs/heads/master Commit: 6c3832b26e119626205732b8fd03c8f5ba986896 Parents: 7eb83fe Author: Andrew Or <[email protected]> Authored: Sun Feb 21 15:00:24 2016 -0800 Committer: Reynold Xin <[email protected]> Committed: Sun Feb 21 15:00:24 2016 -0800 ---------------------------------------------------------------------- .../apache/spark/sql/AnalysisException.scala | 3 + .../spark/sql/catalyst/analysis/Catalog.scala | 9 - .../catalyst/analysis/NoSuchItemException.scala | 52 +++ .../sql/catalyst/catalog/InMemoryCatalog.scala | 154 ++++---- .../spark/sql/catalyst/catalog/interface.scala | 190 ++++++---- .../sql/catalyst/catalog/CatalogTestCases.scala | 284 +++++++++----- .../org/apache/spark/sql/hive/HiveCatalog.scala | 293 ++++++++++++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 171 +++++---- .../org/apache/spark/sql/hive/HiveQl.scala | 145 +++---- .../spark/sql/hive/client/HiveClient.scala | 210 ++++++---- .../spark/sql/hive/client/HiveClientImpl.scala | 379 +++++++++++++------ .../hive/execution/CreateTableAsSelect.scala | 20 +- .../sql/hive/execution/CreateViewAsSelect.scala | 20 +- .../hive/execution/InsertIntoHiveTable.scala | 2 +- .../spark/sql/hive/HiveCatalogSuite.scala | 49 +++ .../sql/hive/HiveMetastoreCatalogSuite.scala | 40 +- .../org/apache/spark/sql/hive/HiveQlSuite.scala | 94 ++--- .../sql/hive/MetastoreDataSourcesSuite.scala | 25 +- .../spark/sql/hive/MultiDatabaseSuite.scala | 4 +- .../spark/sql/hive/client/VersionsSuite.scala | 37 +- .../spark/sql/hive/execution/PruningSuite.scala | 2 +- 21 files changed, 1483 insertions(+), 700 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala index f999218..97f28fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala @@ -19,6 +19,9 @@ package org.apache.spark.sql import org.apache.spark.annotation.DeveloperApi + +// TODO: don't swallow original stack trace if it exists + /** * :: DeveloperApi :: * Thrown when a query fails to analyze, usually because the query itself is invalid. http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index 67edab5..52b284b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -20,20 +20,11 @@ package org.apache.spark.sql.catalyst.analysis import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf, TableIdentifier} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} -/** - * Thrown by a catalog when a table cannot be found. The analyzer will rethrow the exception - * as an AnalysisException with the correct position information. - */ -class NoSuchTableException extends Exception - -class NoSuchDatabaseException extends Exception /** * An interface for looking up relations by name. Used by an [[Analyzer]]. http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala new file mode 100644 index 0000000..81399db --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.catalog.Catalog.TablePartitionSpec + + +/** + * Thrown by a catalog when an item cannot be found. The analyzer will rethrow the exception + * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information. + */ +abstract class NoSuchItemException extends Exception { + override def getMessage: String +} + +class NoSuchDatabaseException(db: String) extends NoSuchItemException { + override def getMessage: String = s"Database $db not found" +} + +class NoSuchTableException(db: String, table: String) extends NoSuchItemException { + override def getMessage: String = s"Table $table not found in database $db" +} + +class NoSuchPartitionException( + db: String, + table: String, + spec: TablePartitionSpec) + extends NoSuchItemException { + + override def getMessage: String = { + s"Partition not found in table $table database $db:\n" + spec.mkString("\n") + } +} + +class NoSuchFunctionException(db: String, func: String) extends NoSuchItemException { + override def getMessage: String = s"Function $func not found in database $db" +} http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 38be61c..cba4de3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -30,15 +30,16 @@ import org.apache.spark.sql.AnalysisException class InMemoryCatalog extends Catalog { import Catalog._ - private class TableDesc(var table: Table) { - val partitions = new mutable.HashMap[PartitionSpec, TablePartition] + private class TableDesc(var table: CatalogTable) { + val partitions = new mutable.HashMap[TablePartitionSpec, CatalogTablePartition] } - private class DatabaseDesc(var db: Database) { + private class DatabaseDesc(var db: CatalogDatabase) { val tables = new mutable.HashMap[String, TableDesc] - val functions = new mutable.HashMap[String, Function] + val functions = new mutable.HashMap[String, CatalogFunction] } + // Database name -> description private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc] private def filterPattern(names: Seq[String], pattern: String): Seq[String] = { @@ -47,39 +48,33 @@ class InMemoryCatalog extends Catalog { } private def existsFunction(db: String, funcName: String): Boolean = { - assertDbExists(db) + requireDbExists(db) catalog(db).functions.contains(funcName) } private def existsTable(db: String, table: String): Boolean = { - assertDbExists(db) + requireDbExists(db) catalog(db).tables.contains(table) } - private def existsPartition(db: String, table: String, spec: PartitionSpec): Boolean = { - assertTableExists(db, table) + private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = { + requireTableExists(db, table) catalog(db).tables(table).partitions.contains(spec) } - private def assertDbExists(db: String): Unit = { - if (!catalog.contains(db)) { - throw new AnalysisException(s"Database $db does not exist") - } - } - - private def assertFunctionExists(db: String, funcName: String): Unit = { + private def requireFunctionExists(db: String, funcName: String): Unit = { if (!existsFunction(db, funcName)) { throw new AnalysisException(s"Function $funcName does not exist in $db database") } } - private def assertTableExists(db: String, table: String): Unit = { + private def requireTableExists(db: String, table: String): Unit = { if (!existsTable(db, table)) { throw new AnalysisException(s"Table $table does not exist in $db database") } } - private def assertPartitionExists(db: String, table: String, spec: PartitionSpec): Unit = { + private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = { if (!existsPartition(db, table, spec)) { throw new AnalysisException(s"Partition does not exist in database $db table $table: $spec") } @@ -90,7 +85,7 @@ class InMemoryCatalog extends Catalog { // -------------------------------------------------------------------------- override def createDatabase( - dbDefinition: Database, + dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = synchronized { if (catalog.contains(dbDefinition.name)) { if (!ignoreIfExists) { @@ -124,17 +119,20 @@ class InMemoryCatalog extends Catalog { } } - override def alterDatabase(db: String, dbDefinition: Database): Unit = synchronized { - assertDbExists(db) - assert(db == dbDefinition.name) - catalog(db).db = dbDefinition + override def alterDatabase(dbDefinition: CatalogDatabase): Unit = synchronized { + requireDbExists(dbDefinition.name) + catalog(dbDefinition.name).db = dbDefinition } - override def getDatabase(db: String): Database = synchronized { - assertDbExists(db) + override def getDatabase(db: String): CatalogDatabase = synchronized { + requireDbExists(db) catalog(db).db } + override def databaseExists(db: String): Boolean = synchronized { + catalog.contains(db) + } + override def listDatabases(): Seq[String] = synchronized { catalog.keySet.toSeq } @@ -143,15 +141,17 @@ class InMemoryCatalog extends Catalog { filterPattern(listDatabases(), pattern) } + override def setCurrentDatabase(db: String): Unit = { /* no-op */ } + // -------------------------------------------------------------------------- // Tables // -------------------------------------------------------------------------- override def createTable( db: String, - tableDefinition: Table, + tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = synchronized { - assertDbExists(db) + requireDbExists(db) if (existsTable(db, tableDefinition.name)) { if (!ignoreIfExists) { throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database") @@ -165,7 +165,7 @@ class InMemoryCatalog extends Catalog { db: String, table: String, ignoreIfNotExists: Boolean): Unit = synchronized { - assertDbExists(db) + requireDbExists(db) if (existsTable(db, table)) { catalog(db).tables.remove(table) } else { @@ -176,31 +176,30 @@ class InMemoryCatalog extends Catalog { } override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized { - assertTableExists(db, oldName) + requireTableExists(db, oldName) val oldDesc = catalog(db).tables(oldName) oldDesc.table = oldDesc.table.copy(name = newName) catalog(db).tables.put(newName, oldDesc) catalog(db).tables.remove(oldName) } - override def alterTable(db: String, table: String, tableDefinition: Table): Unit = synchronized { - assertTableExists(db, table) - assert(table == tableDefinition.name) - catalog(db).tables(table).table = tableDefinition + override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized { + requireTableExists(db, tableDefinition.name) + catalog(db).tables(tableDefinition.name).table = tableDefinition } - override def getTable(db: String, table: String): Table = synchronized { - assertTableExists(db, table) + override def getTable(db: String, table: String): CatalogTable = synchronized { + requireTableExists(db, table) catalog(db).tables(table).table } override def listTables(db: String): Seq[String] = synchronized { - assertDbExists(db) + requireDbExists(db) catalog(db).tables.keySet.toSeq } override def listTables(db: String, pattern: String): Seq[String] = synchronized { - assertDbExists(db) + requireDbExists(db) filterPattern(listTables(db), pattern) } @@ -211,9 +210,9 @@ class InMemoryCatalog extends Catalog { override def createPartitions( db: String, table: String, - parts: Seq[TablePartition], + parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = synchronized { - assertTableExists(db, table) + requireTableExists(db, table) val existingParts = catalog(db).tables(table).partitions if (!ignoreIfExists) { val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec } @@ -229,9 +228,9 @@ class InMemoryCatalog extends Catalog { override def dropPartitions( db: String, table: String, - partSpecs: Seq[PartitionSpec], + partSpecs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean): Unit = synchronized { - assertTableExists(db, table) + requireTableExists(db, table) val existingParts = catalog(db).tables(table).partitions if (!ignoreIfNotExists) { val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s } @@ -244,30 +243,42 @@ class InMemoryCatalog extends Catalog { partSpecs.foreach(existingParts.remove) } - override def alterPartition( + override def renamePartitions( db: String, table: String, - spec: Map[String, String], - newPart: TablePartition): Unit = synchronized { - assertPartitionExists(db, table, spec) - val existingParts = catalog(db).tables(table).partitions - if (spec != newPart.spec) { - // Also a change in specs; remove the old one and add the new one back - existingParts.remove(spec) + specs: Seq[TablePartitionSpec], + newSpecs: Seq[TablePartitionSpec]): Unit = synchronized { + require(specs.size == newSpecs.size, "number of old and new partition specs differ") + specs.zip(newSpecs).foreach { case (oldSpec, newSpec) => + val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec) + val existingParts = catalog(db).tables(table).partitions + existingParts.remove(oldSpec) + existingParts.put(newSpec, newPart) + } + } + + override def alterPartitions( + db: String, + table: String, + parts: Seq[CatalogTablePartition]): Unit = synchronized { + parts.foreach { p => + requirePartitionExists(db, table, p.spec) + catalog(db).tables(table).partitions.put(p.spec, p) } - existingParts.put(newPart.spec, newPart) } override def getPartition( db: String, table: String, - spec: Map[String, String]): TablePartition = synchronized { - assertPartitionExists(db, table, spec) + spec: TablePartitionSpec): CatalogTablePartition = synchronized { + requirePartitionExists(db, table, spec) catalog(db).tables(table).partitions(spec) } - override def listPartitions(db: String, table: String): Seq[TablePartition] = synchronized { - assertTableExists(db, table) + override def listPartitions( + db: String, + table: String): Seq[CatalogTablePartition] = synchronized { + requireTableExists(db, table) catalog(db).tables(table).partitions.values.toSeq } @@ -275,44 +286,39 @@ class InMemoryCatalog extends Catalog { // Functions // -------------------------------------------------------------------------- - override def createFunction( - db: String, - func: Function, - ignoreIfExists: Boolean): Unit = synchronized { - assertDbExists(db) + override def createFunction(db: String, func: CatalogFunction): Unit = synchronized { + requireDbExists(db) if (existsFunction(db, func.name)) { - if (!ignoreIfExists) { - throw new AnalysisException(s"Function $func already exists in $db database") - } + throw new AnalysisException(s"Function $func already exists in $db database") } else { catalog(db).functions.put(func.name, func) } } override def dropFunction(db: String, funcName: String): Unit = synchronized { - assertFunctionExists(db, funcName) + requireFunctionExists(db, funcName) catalog(db).functions.remove(funcName) } - override def alterFunction( - db: String, - funcName: String, - funcDefinition: Function): Unit = synchronized { - assertFunctionExists(db, funcName) - if (funcName != funcDefinition.name) { - // Also a rename; remove the old one and add the new one back - catalog(db).functions.remove(funcName) - } + override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized { + requireFunctionExists(db, oldName) + val newFunc = getFunction(db, oldName).copy(name = newName) + catalog(db).functions.remove(oldName) + catalog(db).functions.put(newName, newFunc) + } + + override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized { + requireFunctionExists(db, funcDefinition.name) catalog(db).functions.put(funcDefinition.name, funcDefinition) } - override def getFunction(db: String, funcName: String): Function = synchronized { - assertFunctionExists(db, funcName) + override def getFunction(db: String, funcName: String): CatalogFunction = synchronized { + requireFunctionExists(db, funcName) catalog(db).functions(funcName) } override def listFunctions(db: String, pattern: String): Seq[String] = synchronized { - assertDbExists(db) + requireDbExists(db) filterPattern(catalog(db).functions.keysIterator.toSeq, pattern) } http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 56aaa6b..dac5f02 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.catalog +import javax.annotation.Nullable + import org.apache.spark.sql.AnalysisException @@ -31,41 +33,59 @@ import org.apache.spark.sql.AnalysisException abstract class Catalog { import Catalog._ + protected def requireDbExists(db: String): Unit = { + if (!databaseExists(db)) { + throw new AnalysisException(s"Database $db does not exist") + } + } + // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- - def createDatabase(dbDefinition: Database, ignoreIfExists: Boolean): Unit + def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit /** - * Alter an existing database. This operation does not support renaming. + * Alter a database whose name matches the one specified in `dbDefinition`, + * assuming the database exists. + * + * Note: If the underlying implementation does not support altering a certain field, + * this becomes a no-op. */ - def alterDatabase(db: String, dbDefinition: Database): Unit + def alterDatabase(dbDefinition: CatalogDatabase): Unit - def getDatabase(db: String): Database + def getDatabase(db: String): CatalogDatabase + + def databaseExists(db: String): Boolean def listDatabases(): Seq[String] def listDatabases(pattern: String): Seq[String] + def setCurrentDatabase(db: String): Unit + // -------------------------------------------------------------------------- // Tables // -------------------------------------------------------------------------- - def createTable(db: String, tableDefinition: Table, ignoreIfExists: Boolean): Unit + def createTable(db: String, tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit def dropTable(db: String, table: String, ignoreIfNotExists: Boolean): Unit def renameTable(db: String, oldName: String, newName: String): Unit /** - * Alter an existing table. This operation does not support renaming. + * Alter a table whose name that matches the one specified in `tableDefinition`, + * assuming the table exists. + * + * Note: If the underlying implementation does not support altering a certain field, + * this becomes a no-op. */ - def alterTable(db: String, table: String, tableDefinition: Table): Unit + def alterTable(db: String, tableDefinition: CatalogTable): Unit - def getTable(db: String, table: String): Table + def getTable(db: String, table: String): CatalogTable def listTables(db: String): Seq[String] @@ -78,43 +98,62 @@ abstract class Catalog { def createPartitions( db: String, table: String, - parts: Seq[TablePartition], + parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit def dropPartitions( db: String, table: String, - parts: Seq[PartitionSpec], + parts: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean): Unit /** - * Alter an existing table partition and optionally override its spec. + * Override the specs of one or many existing table partitions, assuming they exist. + * This assumes index i of `specs` corresponds to index i of `newSpecs`. + */ + def renamePartitions( + db: String, + table: String, + specs: Seq[TablePartitionSpec], + newSpecs: Seq[TablePartitionSpec]): Unit + + /** + * Alter one or many table partitions whose specs that match those specified in `parts`, + * assuming the partitions exist. + * + * Note: If the underlying implementation does not support altering a certain field, + * this becomes a no-op. */ - def alterPartition( + def alterPartitions( db: String, table: String, - spec: PartitionSpec, - newPart: TablePartition): Unit + parts: Seq[CatalogTablePartition]): Unit - def getPartition(db: String, table: String, spec: PartitionSpec): TablePartition + def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition // TODO: support listing by pattern - def listPartitions(db: String, table: String): Seq[TablePartition] + def listPartitions(db: String, table: String): Seq[CatalogTablePartition] // -------------------------------------------------------------------------- // Functions // -------------------------------------------------------------------------- - def createFunction(db: String, funcDefinition: Function, ignoreIfExists: Boolean): Unit + def createFunction(db: String, funcDefinition: CatalogFunction): Unit def dropFunction(db: String, funcName: String): Unit + def renameFunction(db: String, oldName: String, newName: String): Unit + /** - * Alter an existing function and optionally override its name. + * Alter a function whose name that matches the one specified in `funcDefinition`, + * assuming the function exists. + * + * Note: If the underlying implementation does not support altering a certain field, + * this becomes a no-op. */ - def alterFunction(db: String, funcName: String, funcDefinition: Function): Unit + def alterFunction(db: String, funcDefinition: CatalogFunction): Unit - def getFunction(db: String, funcName: String): Function + def getFunction(db: String, funcName: String): CatalogFunction def listFunctions(db: String, pattern: String): Seq[String] @@ -127,33 +166,30 @@ abstract class Catalog { * @param name name of the function * @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc" */ -case class Function( - name: String, - className: String -) +case class CatalogFunction(name: String, className: String) /** * Storage format, used to describe how a partition or a table is stored. */ -case class StorageFormat( - locationUri: String, - inputFormat: String, - outputFormat: String, - serde: String, - serdeProperties: Map[String, String] -) +case class CatalogStorageFormat( + locationUri: Option[String], + inputFormat: Option[String], + outputFormat: Option[String], + serde: Option[String], + serdeProperties: Map[String, String]) /** * A column in a table. */ -case class Column( - name: String, - dataType: String, - nullable: Boolean, - comment: String -) +case class CatalogColumn( + name: String, + // This may be null when used to create views. TODO: make this type-safe; this is left + // as a string due to issues in converting Hive varchars to and from SparkSQL strings. + @Nullable dataType: String, + nullable: Boolean = true, + comment: Option[String] = None) /** @@ -162,10 +198,7 @@ case class Column( * @param spec partition spec values indexed by column name * @param storage storage format of the partition */ -case class TablePartition( - spec: Catalog.PartitionSpec, - storage: StorageFormat -) +case class CatalogTablePartition(spec: Catalog.TablePartitionSpec, storage: CatalogStorageFormat) /** @@ -174,40 +207,65 @@ case class TablePartition( * Note that Hive's metastore also tracks skewed columns. We should consider adding that in the * future once we have a better understanding of how we want to handle skewed columns. */ -case class Table( - name: String, - description: String, - schema: Seq[Column], - partitionColumns: Seq[Column], - sortColumns: Seq[Column], - storage: StorageFormat, - numBuckets: Int, - properties: Map[String, String], - tableType: String, - createTime: Long, - lastAccessTime: Long, - viewOriginalText: Option[String], - viewText: Option[String]) { - - require(tableType == "EXTERNAL_TABLE" || tableType == "INDEX_TABLE" || - tableType == "MANAGED_TABLE" || tableType == "VIRTUAL_VIEW") +case class CatalogTable( + specifiedDatabase: Option[String], + name: String, + tableType: CatalogTableType, + storage: CatalogStorageFormat, + schema: Seq[CatalogColumn], + partitionColumns: Seq[CatalogColumn] = Seq.empty, + sortColumns: Seq[CatalogColumn] = Seq.empty, + numBuckets: Int = 0, + createTime: Long = System.currentTimeMillis, + lastAccessTime: Long = System.currentTimeMillis, + properties: Map[String, String] = Map.empty, + viewOriginalText: Option[String] = None, + viewText: Option[String] = None) { + + /** Return the database this table was specified to belong to, assuming it exists. */ + def database: String = specifiedDatabase.getOrElse { + throw new AnalysisException(s"table $name did not specify database") + } + + /** Return the fully qualified name of this table, assuming the database was specified. */ + def qualifiedName: String = s"$database.$name" + + /** Syntactic sugar to update a field in `storage`. */ + def withNewStorage( + locationUri: Option[String] = storage.locationUri, + inputFormat: Option[String] = storage.inputFormat, + outputFormat: Option[String] = storage.outputFormat, + serde: Option[String] = storage.serde, + serdeProperties: Map[String, String] = storage.serdeProperties): CatalogTable = { + copy(storage = CatalogStorageFormat( + locationUri, inputFormat, outputFormat, serde, serdeProperties)) + } + +} + + +case class CatalogTableType private(name: String) +object CatalogTableType { + val EXTERNAL_TABLE = new CatalogTableType("EXTERNAL_TABLE") + val MANAGED_TABLE = new CatalogTableType("MANAGED_TABLE") + val INDEX_TABLE = new CatalogTableType("INDEX_TABLE") + val VIRTUAL_VIEW = new CatalogTableType("VIRTUAL_VIEW") } /** * A database defined in the catalog. */ -case class Database( - name: String, - description: String, - locationUri: String, - properties: Map[String, String] -) +case class CatalogDatabase( + name: String, + description: String, + locationUri: String, + properties: Map[String, String]) object Catalog { /** - * Specifications of a table partition indexed by column name. + * Specifications of a table partition. Mapping column name to column value. */ - type PartitionSpec = Map[String, String] + type TablePartitionSpec = Map[String, String] } http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala index 45c5cee..e0d1220 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.catalog +import org.scalatest.BeforeAndAfterEach + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException @@ -26,18 +28,38 @@ import org.apache.spark.sql.AnalysisException * * Implementations of the [[Catalog]] interface can create test suites by extending this. */ -abstract class CatalogTestCases extends SparkFunSuite { - private val storageFormat = StorageFormat("usa", "$", "zzz", "serde", Map()) - private val part1 = TablePartition(Map("a" -> "1"), storageFormat) - private val part2 = TablePartition(Map("b" -> "2"), storageFormat) - private val part3 = TablePartition(Map("c" -> "3"), storageFormat) +abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { + private lazy val storageFormat = CatalogStorageFormat( + locationUri = None, + inputFormat = Some(tableInputFormat), + outputFormat = Some(tableOutputFormat), + serde = None, + serdeProperties = Map.empty) + private lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat) + private lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat) + private lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat) private val funcClass = "org.apache.spark.myFunc" + // Things subclasses should override + protected val tableInputFormat: String = "org.apache.park.serde.MyInputFormat" + protected val tableOutputFormat: String = "org.apache.park.serde.MyOutputFormat" + protected def newUriForDatabase(): String = "uri" + protected def resetState(): Unit = { } protected def newEmptyCatalog(): Catalog + // Clear all state after each test + override def afterEach(): Unit = { + try { + resetState() + } finally { + super.afterEach() + } + } + /** * Creates a basic catalog, with the following structure: * + * default * db1 * db2 * - tbl1 @@ -48,37 +70,65 @@ abstract class CatalogTestCases extends SparkFunSuite { */ private def newBasicCatalog(): Catalog = { val catalog = newEmptyCatalog() + // When testing against a real catalog, the default database may already exist + catalog.createDatabase(newDb("default"), ignoreIfExists = true) catalog.createDatabase(newDb("db1"), ignoreIfExists = false) catalog.createDatabase(newDb("db2"), ignoreIfExists = false) - catalog.createTable("db2", newTable("tbl1"), ignoreIfExists = false) - catalog.createTable("db2", newTable("tbl2"), ignoreIfExists = false) + catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false) + catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false) catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false) - catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false) + catalog.createFunction("db2", newFunc("func1")) catalog } - private def newFunc(): Function = Function("funcname", funcClass) + private def newFunc(): CatalogFunction = CatalogFunction("funcname", funcClass) + + private def newDb(name: String): CatalogDatabase = { + CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty) + } + + private def newTable(name: String, db: String): CatalogTable = { + CatalogTable( + specifiedDatabase = Some(db), + name = name, + tableType = CatalogTableType.EXTERNAL_TABLE, + storage = storageFormat, + schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")), + partitionColumns = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string"))) + } - private def newDb(name: String = "default"): Database = - Database(name, name + " description", "uri", Map.empty) + private def newFunc(name: String): CatalogFunction = CatalogFunction(name, funcClass) - private def newTable(name: String): Table = - Table(name, "", Seq.empty, Seq.empty, Seq.empty, null, 0, Map.empty, "EXTERNAL_TABLE", 0, 0, - None, None) + /** + * Whether the catalog's table partitions equal the ones given. + * Note: Hive sets some random serde things, so we just compare the specs here. + */ + private def catalogPartitionsEqual( + catalog: Catalog, + db: String, + table: String, + parts: Seq[CatalogTablePartition]): Boolean = { + catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet + } - private def newFunc(name: String): Function = Function(name, funcClass) // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- - test("basic create, drop and list databases") { + test("basic create and list databases") { val catalog = newEmptyCatalog() - catalog.createDatabase(newDb(), ignoreIfExists = false) - assert(catalog.listDatabases().toSet == Set("default")) - - catalog.createDatabase(newDb("default2"), ignoreIfExists = false) - assert(catalog.listDatabases().toSet == Set("default", "default2")) + catalog.createDatabase(newDb("default"), ignoreIfExists = true) + assert(catalog.databaseExists("default")) + assert(!catalog.databaseExists("testing")) + assert(!catalog.databaseExists("testing2")) + catalog.createDatabase(newDb("testing"), ignoreIfExists = false) + assert(catalog.databaseExists("testing")) + assert(catalog.listDatabases().toSet == Set("default", "testing")) + catalog.createDatabase(newDb("testing2"), ignoreIfExists = false) + assert(catalog.listDatabases().toSet == Set("default", "testing", "testing2")) + assert(catalog.databaseExists("testing2")) + assert(!catalog.databaseExists("does_not_exist")) } test("get database when a database exists") { @@ -93,7 +143,7 @@ abstract class CatalogTestCases extends SparkFunSuite { test("list databases without pattern") { val catalog = newBasicCatalog() - assert(catalog.listDatabases().toSet == Set("db1", "db2")) + assert(catalog.listDatabases().toSet == Set("default", "db1", "db2")) } test("list databases with pattern") { @@ -107,7 +157,7 @@ abstract class CatalogTestCases extends SparkFunSuite { test("drop database") { val catalog = newBasicCatalog() catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false) - assert(catalog.listDatabases().toSet == Set("db2")) + assert(catalog.listDatabases().toSet == Set("default", "db2")) } test("drop database when the database is not empty") { @@ -118,6 +168,7 @@ abstract class CatalogTestCases extends SparkFunSuite { intercept[AnalysisException] { catalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false) } + resetState() // Throw exception if there are tables left val catalog2 = newBasicCatalog() @@ -125,11 +176,12 @@ abstract class CatalogTestCases extends SparkFunSuite { intercept[AnalysisException] { catalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false) } + resetState() // When cascade is true, it should drop them val catalog3 = newBasicCatalog() catalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true) - assert(catalog3.listDatabases().toSet == Set("db1")) + assert(catalog3.listDatabases().toSet == Set("default", "db1")) } test("drop database when the database does not exist") { @@ -144,13 +196,19 @@ abstract class CatalogTestCases extends SparkFunSuite { test("alter database") { val catalog = newBasicCatalog() - catalog.alterDatabase("db1", Database("db1", "new description", "lll", Map.empty)) - assert(catalog.getDatabase("db1").description == "new description") + val db1 = catalog.getDatabase("db1") + // Note: alter properties here because Hive does not support altering other fields + catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true"))) + val newDb1 = catalog.getDatabase("db1") + assert(db1.properties.isEmpty) + assert(newDb1.properties.size == 2) + assert(newDb1.properties.get("k") == Some("v3")) + assert(newDb1.properties.get("good") == Some("true")) } test("alter database should throw exception when the database does not exist") { intercept[AnalysisException] { - newBasicCatalog().alterDatabase("no_db", Database("no_db", "ddd", "lll", Map.empty)) + newBasicCatalog().alterDatabase(newDb("does_not_exist")) } } @@ -165,61 +223,56 @@ abstract class CatalogTestCases extends SparkFunSuite { assert(catalog.listTables("db2").toSet == Set("tbl2")) } - test("drop table when database / table does not exist") { + test("drop table when database/table does not exist") { val catalog = newBasicCatalog() - // Should always throw exception when the database does not exist intercept[AnalysisException] { catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false) } - intercept[AnalysisException] { catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true) } - // Should throw exception when the table does not exist, if ignoreIfNotExists is false intercept[AnalysisException] { catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false) } - catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true) } test("rename table") { val catalog = newBasicCatalog() - assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) catalog.renameTable("db2", "tbl1", "tblone") assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2")) } - test("rename table when database / table does not exist") { + test("rename table when database/table does not exist") { val catalog = newBasicCatalog() - - intercept[AnalysisException] { // Throw exception when the database does not exist + intercept[AnalysisException] { catalog.renameTable("unknown_db", "unknown_table", "unknown_table") } - - intercept[AnalysisException] { // Throw exception when the table does not exist + intercept[AnalysisException] { catalog.renameTable("db2", "unknown_table", "unknown_table") } } test("alter table") { val catalog = newBasicCatalog() - catalog.alterTable("db2", "tbl1", newTable("tbl1").copy(createTime = 10)) - assert(catalog.getTable("db2", "tbl1").createTime == 10) + val tbl1 = catalog.getTable("db2", "tbl1") + catalog.alterTable("db2", tbl1.copy(properties = Map("toh" -> "frem"))) + val newTbl1 = catalog.getTable("db2", "tbl1") + assert(!tbl1.properties.contains("toh")) + assert(newTbl1.properties.size == tbl1.properties.size + 1) + assert(newTbl1.properties.get("toh") == Some("frem")) } - test("alter table when database / table does not exist") { + test("alter table when database/table does not exist") { val catalog = newBasicCatalog() - - intercept[AnalysisException] { // Throw exception when the database does not exist - catalog.alterTable("unknown_db", "unknown_table", newTable("unknown_table")) + intercept[AnalysisException] { + catalog.alterTable("unknown_db", newTable("tbl1", "unknown_db")) } - - intercept[AnalysisException] { // Throw exception when the table does not exist - catalog.alterTable("db2", "unknown_table", newTable("unknown_table")) + intercept[AnalysisException] { + catalog.alterTable("db2", newTable("unknown_table", "db2")) } } @@ -227,12 +280,11 @@ abstract class CatalogTestCases extends SparkFunSuite { assert(newBasicCatalog().getTable("db2", "tbl1").name == "tbl1") } - test("get table when database / table does not exist") { + test("get table when database/table does not exist") { val catalog = newBasicCatalog() intercept[AnalysisException] { catalog.getTable("unknown_db", "unknown_table") } - intercept[AnalysisException] { catalog.getTable("db2", "unknown_table") } @@ -246,10 +298,7 @@ abstract class CatalogTestCases extends SparkFunSuite { test("list tables with pattern") { val catalog = newBasicCatalog() - - // Test when database does not exist intercept[AnalysisException] { catalog.listTables("unknown_db") } - assert(catalog.listTables("db1", "*").toSet == Set.empty) assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2")) assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2")) @@ -263,12 +312,12 @@ abstract class CatalogTestCases extends SparkFunSuite { test("basic create and list partitions") { val catalog = newEmptyCatalog() catalog.createDatabase(newDb("mydb"), ignoreIfExists = false) - catalog.createTable("mydb", newTable("mytbl"), ignoreIfExists = false) - catalog.createPartitions("mydb", "mytbl", Seq(part1, part2), ignoreIfExists = false) - assert(catalog.listPartitions("mydb", "mytbl").toSet == Set(part1, part2)) + catalog.createTable("mydb", newTable("tbl", "mydb"), ignoreIfExists = false) + catalog.createPartitions("mydb", "tbl", Seq(part1, part2), ignoreIfExists = false) + assert(catalogPartitionsEqual(catalog, "mydb", "tbl", Seq(part1, part2))) } - test("create partitions when database / table does not exist") { + test("create partitions when database/table does not exist") { val catalog = newBasicCatalog() intercept[AnalysisException] { catalog.createPartitions("does_not_exist", "tbl1", Seq(), ignoreIfExists = false) @@ -288,16 +337,17 @@ abstract class CatalogTestCases extends SparkFunSuite { test("drop partitions") { val catalog = newBasicCatalog() - assert(catalog.listPartitions("db2", "tbl2").toSet == Set(part1, part2)) + assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2))) catalog.dropPartitions("db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false) - assert(catalog.listPartitions("db2", "tbl2").toSet == Set(part2)) + assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2))) + resetState() val catalog2 = newBasicCatalog() - assert(catalog2.listPartitions("db2", "tbl2").toSet == Set(part1, part2)) + assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2))) catalog2.dropPartitions("db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false) assert(catalog2.listPartitions("db2", "tbl2").isEmpty) } - test("drop partitions when database / table does not exist") { + test("drop partitions when database/table does not exist") { val catalog = newBasicCatalog() intercept[AnalysisException] { catalog.dropPartitions("does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false) @@ -317,14 +367,14 @@ abstract class CatalogTestCases extends SparkFunSuite { test("get partition") { val catalog = newBasicCatalog() - assert(catalog.getPartition("db2", "tbl2", part1.spec) == part1) - assert(catalog.getPartition("db2", "tbl2", part2.spec) == part2) + assert(catalog.getPartition("db2", "tbl2", part1.spec).spec == part1.spec) + assert(catalog.getPartition("db2", "tbl2", part2.spec).spec == part2.spec) intercept[AnalysisException] { catalog.getPartition("db2", "tbl1", part3.spec) } } - test("get partition when database / table does not exist") { + test("get partition when database/table does not exist") { val catalog = newBasicCatalog() intercept[AnalysisException] { catalog.getPartition("does_not_exist", "tbl1", part1.spec) @@ -334,28 +384,69 @@ abstract class CatalogTestCases extends SparkFunSuite { } } - test("alter partitions") { + test("rename partitions") { + val catalog = newBasicCatalog() + val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101")) + val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201")) + val newSpecs = Seq(newPart1.spec, newPart2.spec) + catalog.renamePartitions("db2", "tbl2", Seq(part1.spec, part2.spec), newSpecs) + assert(catalog.getPartition("db2", "tbl2", newPart1.spec).spec === newPart1.spec) + assert(catalog.getPartition("db2", "tbl2", newPart2.spec).spec === newPart2.spec) + // The old partitions should no longer exist + intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part1.spec) } + intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part2.spec) } + } + + test("rename partitions when database/table does not exist") { val catalog = newBasicCatalog() - val partSameSpec = part1.copy(storage = storageFormat.copy(serde = "myserde")) - val partNewSpec = part1.copy(spec = Map("x" -> "10")) - // alter but keep spec the same - catalog.alterPartition("db2", "tbl2", part1.spec, partSameSpec) - assert(catalog.getPartition("db2", "tbl2", part1.spec) == partSameSpec) - // alter and change spec - catalog.alterPartition("db2", "tbl2", part1.spec, partNewSpec) intercept[AnalysisException] { - catalog.getPartition("db2", "tbl2", part1.spec) + catalog.renamePartitions("does_not_exist", "tbl1", Seq(part1.spec), Seq(part2.spec)) + } + intercept[AnalysisException] { + catalog.renamePartitions("db2", "does_not_exist", Seq(part1.spec), Seq(part2.spec)) } - assert(catalog.getPartition("db2", "tbl2", partNewSpec.spec) == partNewSpec) } - test("alter partition when database / table does not exist") { + test("alter partitions") { + val catalog = newBasicCatalog() + try{ + // Note: Before altering table partitions in Hive, you *must* set the current database + // to the one that contains the table of interest. Otherwise you will end up with the + // most helpful error message ever: "Unable to alter partition. alter is not possible." + // See HIVE-2742 for more detail. + catalog.setCurrentDatabase("db2") + val newLocation = newUriForDatabase() + // alter but keep spec the same + val oldPart1 = catalog.getPartition("db2", "tbl2", part1.spec) + val oldPart2 = catalog.getPartition("db2", "tbl2", part2.spec) + catalog.alterPartitions("db2", "tbl2", Seq( + oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))), + oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation))))) + val newPart1 = catalog.getPartition("db2", "tbl2", part1.spec) + val newPart2 = catalog.getPartition("db2", "tbl2", part2.spec) + assert(newPart1.storage.locationUri == Some(newLocation)) + assert(newPart2.storage.locationUri == Some(newLocation)) + assert(oldPart1.storage.locationUri != Some(newLocation)) + assert(oldPart2.storage.locationUri != Some(newLocation)) + // alter but change spec, should fail because new partition specs do not exist yet + val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2")) + val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4")) + intercept[AnalysisException] { + catalog.alterPartitions("db2", "tbl2", Seq(badPart1, badPart2)) + } + } finally { + // Remember to restore the original current database, which we assume to be "default" + catalog.setCurrentDatabase("default") + } + } + + test("alter partitions when database/table does not exist") { val catalog = newBasicCatalog() intercept[AnalysisException] { - catalog.alterPartition("does_not_exist", "tbl1", part1.spec, part1) + catalog.alterPartitions("does_not_exist", "tbl1", Seq(part1)) } intercept[AnalysisException] { - catalog.alterPartition("db2", "does_not_exist", part1.spec, part1) + catalog.alterPartitions("db2", "does_not_exist", Seq(part1)) } } @@ -366,23 +457,22 @@ abstract class CatalogTestCases extends SparkFunSuite { test("basic create and list functions") { val catalog = newEmptyCatalog() catalog.createDatabase(newDb("mydb"), ignoreIfExists = false) - catalog.createFunction("mydb", newFunc("myfunc"), ignoreIfExists = false) + catalog.createFunction("mydb", newFunc("myfunc")) assert(catalog.listFunctions("mydb", "*").toSet == Set("myfunc")) } test("create function when database does not exist") { val catalog = newBasicCatalog() intercept[AnalysisException] { - catalog.createFunction("does_not_exist", newFunc(), ignoreIfExists = false) + catalog.createFunction("does_not_exist", newFunc()) } } test("create function that already exists") { val catalog = newBasicCatalog() intercept[AnalysisException] { - catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false) + catalog.createFunction("db2", newFunc("func1")) } - catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = true) } test("drop function") { @@ -421,31 +511,43 @@ abstract class CatalogTestCases extends SparkFunSuite { } } - test("alter function") { + test("rename function") { val catalog = newBasicCatalog() + val newName = "funcky" assert(catalog.getFunction("db2", "func1").className == funcClass) - // alter func but keep name - catalog.alterFunction("db2", "func1", newFunc("func1").copy(className = "muhaha")) - assert(catalog.getFunction("db2", "func1").className == "muhaha") - // alter func and change name - catalog.alterFunction("db2", "func1", newFunc("funcky")) + catalog.renameFunction("db2", "func1", newName) + intercept[AnalysisException] { catalog.getFunction("db2", "func1") } + assert(catalog.getFunction("db2", newName).name == newName) + assert(catalog.getFunction("db2", newName).className == funcClass) + intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") } + } + + test("rename function when database does not exist") { + val catalog = newBasicCatalog() intercept[AnalysisException] { - catalog.getFunction("db2", "func1") + catalog.renameFunction("does_not_exist", "func1", "func5") } - assert(catalog.getFunction("db2", "funcky").className == funcClass) + } + + test("alter function") { + val catalog = newBasicCatalog() + assert(catalog.getFunction("db2", "func1").className == funcClass) + catalog.alterFunction("db2", newFunc("func1").copy(className = "muhaha")) + assert(catalog.getFunction("db2", "func1").className == "muhaha") + intercept[AnalysisException] { catalog.alterFunction("db2", newFunc("funcky")) } } test("alter function when database does not exist") { val catalog = newBasicCatalog() intercept[AnalysisException] { - catalog.alterFunction("does_not_exist", "func1", newFunc()) + catalog.alterFunction("does_not_exist", newFunc()) } } test("list functions") { val catalog = newBasicCatalog() - catalog.createFunction("db2", newFunc("func2"), ignoreIfExists = false) - catalog.createFunction("db2", newFunc("not_me"), ignoreIfExists = false) + catalog.createFunction("db2", newFunc("func2")) + catalog.createFunction("db2", newFunc("not_me")) assert(catalog.listFunctions("db2", "*").toSet == Set("func1", "func2", "not_me")) assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2")) } http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala new file mode 100644 index 0000000..21b9cfb --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.util.control.NonFatal + +import org.apache.hadoop.hive.ql.metadata.HiveException +import org.apache.thrift.TException + +import org.apache.spark.Logging +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.NoSuchItemException +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.hive.client.HiveClient + + +/** + * A persistent implementation of the system catalog using Hive. + * All public methods must be synchronized for thread-safety. + */ +private[spark] class HiveCatalog(client: HiveClient) extends Catalog with Logging { + import Catalog._ + + // Exceptions thrown by the hive client that we would like to wrap + private val clientExceptions = Set( + classOf[HiveException].getCanonicalName, + classOf[TException].getCanonicalName) + + /** + * Whether this is an exception thrown by the hive client that should be wrapped. + * + * Due to classloader isolation issues, pattern matching won't work here so we need + * to compare the canonical names of the exceptions, which we assume to be stable. + */ + private def isClientException(e: Throwable): Boolean = { + var temp: Class[_] = e.getClass + var found = false + while (temp != null && !found) { + found = clientExceptions.contains(temp.getCanonicalName) + temp = temp.getSuperclass + } + found + } + + /** + * Run some code involving `client` in a [[synchronized]] block and wrap certain + * exceptions thrown in the process in [[AnalysisException]]. + */ + private def withClient[T](body: => T): T = synchronized { + try { + body + } catch { + case e: NoSuchItemException => + throw new AnalysisException(e.getMessage) + case NonFatal(e) if isClientException(e) => + throw new AnalysisException(e.getClass.getCanonicalName + ": " + e.getMessage) + } + } + + private def requireDbMatches(db: String, table: CatalogTable): Unit = { + if (table.specifiedDatabase != Some(db)) { + throw new AnalysisException( + s"Provided database $db does not much the one specified in the " + + s"table definition (${table.specifiedDatabase.getOrElse("n/a")})") + } + } + + private def requireTableExists(db: String, table: String): Unit = { + withClient { getTable(db, table) } + } + + + // -------------------------------------------------------------------------- + // Databases + // -------------------------------------------------------------------------- + + override def createDatabase( + dbDefinition: CatalogDatabase, + ignoreIfExists: Boolean): Unit = withClient { + client.createDatabase(dbDefinition, ignoreIfExists) + } + + override def dropDatabase( + db: String, + ignoreIfNotExists: Boolean, + cascade: Boolean): Unit = withClient { + client.dropDatabase(db, ignoreIfNotExists, cascade) + } + + /** + * Alter a database whose name matches the one specified in `dbDefinition`, + * assuming the database exists. + * + * Note: As of now, this only supports altering database properties! + */ + override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient { + val existingDb = getDatabase(dbDefinition.name) + if (existingDb.properties == dbDefinition.properties) { + logWarning(s"Request to alter database ${dbDefinition.name} is a no-op because " + + s"the provided database properties are the same as the old ones. Hive does not " + + s"currently support altering other database fields.") + } + client.alterDatabase(dbDefinition) + } + + override def getDatabase(db: String): CatalogDatabase = withClient { + client.getDatabase(db) + } + + override def databaseExists(db: String): Boolean = withClient { + client.getDatabaseOption(db).isDefined + } + + override def listDatabases(): Seq[String] = withClient { + client.listDatabases("*") + } + + override def listDatabases(pattern: String): Seq[String] = withClient { + client.listDatabases(pattern) + } + + override def setCurrentDatabase(db: String): Unit = withClient { + client.setCurrentDatabase(db) + } + + // -------------------------------------------------------------------------- + // Tables + // -------------------------------------------------------------------------- + + override def createTable( + db: String, + tableDefinition: CatalogTable, + ignoreIfExists: Boolean): Unit = withClient { + requireDbExists(db) + requireDbMatches(db, tableDefinition) + client.createTable(tableDefinition, ignoreIfExists) + } + + override def dropTable( + db: String, + table: String, + ignoreIfNotExists: Boolean): Unit = withClient { + requireDbExists(db) + client.dropTable(db, table, ignoreIfNotExists) + } + + override def renameTable(db: String, oldName: String, newName: String): Unit = withClient { + val newTable = client.getTable(db, oldName).copy(name = newName) + client.alterTable(oldName, newTable) + } + + /** + * Alter a table whose name that matches the one specified in `tableDefinition`, + * assuming the table exists. + * + * Note: As of now, this only supports altering table properties, serde properties, + * and num buckets! + */ + override def alterTable(db: String, tableDefinition: CatalogTable): Unit = withClient { + requireDbMatches(db, tableDefinition) + requireTableExists(db, tableDefinition.name) + client.alterTable(tableDefinition) + } + + override def getTable(db: String, table: String): CatalogTable = withClient { + client.getTable(db, table) + } + + override def listTables(db: String): Seq[String] = withClient { + requireDbExists(db) + client.listTables(db) + } + + override def listTables(db: String, pattern: String): Seq[String] = withClient { + requireDbExists(db) + client.listTables(db, pattern) + } + + // -------------------------------------------------------------------------- + // Partitions + // -------------------------------------------------------------------------- + + override def createPartitions( + db: String, + table: String, + parts: Seq[CatalogTablePartition], + ignoreIfExists: Boolean): Unit = withClient { + requireTableExists(db, table) + client.createPartitions(db, table, parts, ignoreIfExists) + } + + override def dropPartitions( + db: String, + table: String, + parts: Seq[TablePartitionSpec], + ignoreIfNotExists: Boolean): Unit = withClient { + requireTableExists(db, table) + // Note: Unfortunately Hive does not currently support `ignoreIfNotExists` so we + // need to implement it here ourselves. This is currently somewhat expensive because + // we make multiple synchronous calls to Hive for each partition we want to drop. + val partsToDrop = + if (ignoreIfNotExists) { + parts.filter { spec => + try { + getPartition(db, table, spec) + true + } catch { + // Filter out the partitions that do not actually exist + case _: AnalysisException => false + } + } + } else { + parts + } + if (partsToDrop.nonEmpty) { + client.dropPartitions(db, table, partsToDrop) + } + } + + override def renamePartitions( + db: String, + table: String, + specs: Seq[TablePartitionSpec], + newSpecs: Seq[TablePartitionSpec]): Unit = withClient { + client.renamePartitions(db, table, specs, newSpecs) + } + + override def alterPartitions( + db: String, + table: String, + newParts: Seq[CatalogTablePartition]): Unit = withClient { + client.alterPartitions(db, table, newParts) + } + + override def getPartition( + db: String, + table: String, + spec: TablePartitionSpec): CatalogTablePartition = withClient { + client.getPartition(db, table, spec) + } + + override def listPartitions( + db: String, + table: String): Seq[CatalogTablePartition] = withClient { + client.getAllPartitions(db, table) + } + + // -------------------------------------------------------------------------- + // Functions + // -------------------------------------------------------------------------- + + override def createFunction( + db: String, + funcDefinition: CatalogFunction): Unit = withClient { + client.createFunction(db, funcDefinition) + } + + override def dropFunction(db: String, name: String): Unit = withClient { + client.dropFunction(db, name) + } + + override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient { + client.renameFunction(db, oldName, newName) + } + + override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = withClient { + client.alterFunction(db, funcDefinition) + } + + override def getFunction(db: String, funcName: String): CatalogFunction = withClient { + client.getFunction(db, funcName) + } + + override def listFunctions(db: String, pattern: String): Seq[String] = withClient { + client.listFunctions(db, pattern) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c222b00..3788736 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -25,15 +25,16 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.metastore.Warehouse +import org.apache.hadoop.hive.metastore.{TableType => HiveTableType, Warehouse} import org.apache.hadoop.hive.metastore.api.FieldSchema -import org.apache.hadoop.hive.ql.metadata._ +import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable, _} import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.Logging import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ @@ -96,6 +97,8 @@ private[hive] object HiveSerDe { } } + +// TODO: replace this with o.a.s.sql.hive.HiveCatalog once we merge SQLContext and HiveContext private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext) extends Catalog with Logging { @@ -107,16 +110,16 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte /** A fully qualified identifier for a table (i.e., database.tableName) */ case class QualifiedTableName(database: String, name: String) - private def getQualifiedTableName(tableIdent: TableIdentifier) = { + private def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = { QualifiedTableName( tableIdent.database.getOrElse(client.currentDatabase).toLowerCase, tableIdent.table.toLowerCase) } - private def getQualifiedTableName(hiveTable: HiveTable) = { + private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = { QualifiedTableName( - hiveTable.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase, - hiveTable.name.toLowerCase) + t.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase, + t.name.toLowerCase) } /** A cache of Spark SQL data source tables that have been accessed. */ @@ -175,7 +178,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte // It does not appear that the ql client for the metastore has a way to enumerate all the // SerDe properties directly... - val options = table.serdeProperties + val options = table.storage.serdeProperties val resolvedRelation = ResolvedDataSource( @@ -276,53 +279,54 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val tableType = if (isExternal) { tableProperties.put("EXTERNAL", "TRUE") - ExternalTable + CatalogTableType.EXTERNAL_TABLE } else { tableProperties.put("EXTERNAL", "FALSE") - ManagedTable + CatalogTableType.MANAGED_TABLE } val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.hiveconf) val dataSource = ResolvedDataSource( hive, userSpecifiedSchema, partitionColumns, bucketSpec, provider, options) - def newSparkSQLSpecificMetastoreTable(): HiveTable = { - HiveTable( + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { + CatalogTable( specifiedDatabase = Option(dbName), name = tblName, - schema = Nil, - partitionColumns = Nil, tableType = tableType, - properties = tableProperties.toMap, - serdeProperties = options) + schema = Nil, + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = None, + outputFormat = None, + serde = None, + serdeProperties = options + ), + properties = tableProperties.toMap) } - def newHiveCompatibleMetastoreTable(relation: HadoopFsRelation, serde: HiveSerDe): HiveTable = { - def schemaToHiveColumn(schema: StructType): Seq[HiveColumn] = { - schema.map { field => - HiveColumn( - name = field.name, - hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType), - comment = "") - } - } - + def newHiveCompatibleMetastoreTable( + relation: HadoopFsRelation, + serde: HiveSerDe): CatalogTable = { assert(partitionColumns.isEmpty) assert(relation.partitionColumns.isEmpty) - HiveTable( + CatalogTable( specifiedDatabase = Option(dbName), name = tblName, - schema = schemaToHiveColumn(relation.schema), - partitionColumns = Nil, tableType = tableType, + storage = CatalogStorageFormat( + locationUri = Some(relation.paths.head), + inputFormat = serde.inputFormat, + outputFormat = serde.outputFormat, + serde = serde.serde, + serdeProperties = options + ), + schema = relation.schema.map { f => + CatalogColumn(f.name, HiveMetastoreTypes.toMetastoreType(f.dataType)) + }, properties = tableProperties.toMap, - serdeProperties = options, - location = Some(relation.paths.head), - viewText = None, // TODO We need to place the SQL string here. - inputFormat = serde.inputFormat, - outputFormat = serde.outputFormat, - serde = serde.serde) + viewText = None) // TODO: We need to place the SQL string here } // TODO: Support persisting partitioned data source relations in Hive compatible format @@ -379,7 +383,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte // specific way. try { logInfo(message) - client.createTable(table) + client.createTable(table, ignoreIfExists = false) } catch { case throwable: Throwable => val warningMessage = @@ -387,20 +391,20 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte s"it into Hive metastore in Spark SQL specific format." logWarning(warningMessage, throwable) val sparkSqlSpecificTable = newSparkSQLSpecificMetastoreTable() - client.createTable(sparkSqlSpecificTable) + client.createTable(sparkSqlSpecificTable, ignoreIfExists = false) } case (None, message) => logWarning(message) val hiveTable = newSparkSQLSpecificMetastoreTable() - client.createTable(hiveTable) + client.createTable(hiveTable, ignoreIfExists = false) } } def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = { // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName) val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) - new Path(new Path(client.getDatabase(dbName).location), tblName).toString + new Path(new Path(client.getDatabase(dbName).locationUri), tblName).toString } override def tableExists(tableIdent: TableIdentifier): Boolean = { @@ -420,7 +424,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte // Then, if alias is specified, wrap the table with a Subquery using the alias. // Otherwise, wrap the table with a Subquery using the table name. alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers) - } else if (table.tableType == VirtualView) { + } else if (table.tableType == CatalogTableType.VIRTUAL_VIEW) { val viewText = table.viewText.getOrElse(sys.error("Invalid view without text.")) alias match { // because hive use things like `_c0` to build the expanded text @@ -429,7 +433,8 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte case Some(aliasText) => SubqueryAlias(aliasText, hive.parseSql(viewText)) } } else { - MetastoreRelation(qualifiedTableName.database, qualifiedTableName.name, alias)(table)(hive) + MetastoreRelation( + qualifiedTableName.database, qualifiedTableName.name, alias)(table, client, hive) } } @@ -602,16 +607,14 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val schema = if (table.schema.nonEmpty) { table.schema } else { - child.output.map { - attr => new HiveColumn( - attr.name, - HiveMetastoreTypes.toMetastoreType(attr.dataType), null) + child.output.map { a => + CatalogColumn(a.name, HiveMetastoreTypes.toMetastoreType(a.dataType), a.nullable) } } val desc = table.copy(schema = schema) - if (hive.convertCTAS && table.serde.isEmpty) { + if (hive.convertCTAS && table.storage.serde.isEmpty) { // Do the conversion when spark.sql.hive.convertCTAS is true and the query // does not specify any storage format (file format and storage handler). if (table.specifiedDatabase.isDefined) { @@ -632,9 +635,9 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte child ) } else { - val desc = if (table.serde.isEmpty) { + val desc = if (table.storage.serde.isEmpty) { // add default serde - table.copy( + table.withNewStorage( serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) } else { table @@ -744,10 +747,13 @@ private[hive] case class InsertIntoHiveTable( } } -private[hive] case class MetastoreRelation - (databaseName: String, tableName: String, alias: Option[String]) - (val table: HiveTable) - (@transient private val sqlContext: SQLContext) +private[hive] case class MetastoreRelation( + databaseName: String, + tableName: String, + alias: Option[String]) + (val table: CatalogTable, + @transient private val client: HiveClient, + @transient private val sqlContext: SQLContext) extends LeafNode with MultiInstanceRelation with FileRelation { override def equals(other: Any): Boolean = other match { @@ -765,7 +771,12 @@ private[hive] case class MetastoreRelation override protected def otherCopyArgs: Seq[AnyRef] = table :: sqlContext :: Nil - @transient val hiveQlTable: Table = { + private def toHiveColumn(c: CatalogColumn): FieldSchema = { + new FieldSchema(c.name, c.dataType, c.comment.orNull) + } + + // TODO: merge this with HiveClientImpl#toHiveTable + @transient val hiveQlTable: HiveTable = { // We start by constructing an API table as Hive performs several important transformations // internally when converting an API table to a QL table. val tTable = new org.apache.hadoop.hive.metastore.api.Table() @@ -776,27 +787,31 @@ private[hive] case class MetastoreRelation tTable.setParameters(tableParameters) table.properties.foreach { case (k, v) => tableParameters.put(k, v) } - tTable.setTableType(table.tableType.name) + tTable.setTableType(table.tableType match { + case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE.toString + case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE.toString + case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE.toString + case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW.toString + }) val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() tTable.setSd(sd) - sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) - tTable.setPartitionKeys( - table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) + sd.setCols(table.schema.map(toHiveColumn).asJava) + tTable.setPartitionKeys(table.partitionColumns.map(toHiveColumn).asJava) - table.location.foreach(sd.setLocation) - table.inputFormat.foreach(sd.setInputFormat) - table.outputFormat.foreach(sd.setOutputFormat) + table.storage.locationUri.foreach(sd.setLocation) + table.storage.inputFormat.foreach(sd.setInputFormat) + table.storage.outputFormat.foreach(sd.setOutputFormat) val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo - table.serde.foreach(serdeInfo.setSerializationLib) + table.storage.serde.foreach(serdeInfo.setSerializationLib) sd.setSerdeInfo(serdeInfo) val serdeParameters = new java.util.HashMap[String, String]() - table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } + table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } serdeInfo.setParameters(serdeParameters) - new Table(tTable) + new HiveTable(tTable) } @transient override lazy val statistics: Statistics = Statistics( @@ -821,11 +836,11 @@ private[hive] case class MetastoreRelation // When metastore partition pruning is turned off, we cache the list of all partitions to // mimic the behavior of Spark < 1.5 - lazy val allPartitions = table.getAllPartitions + private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(table) def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = { val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { - table.getPartitions(predicates) + client.getPartitionsByFilter(table, predicates) } else { allPartitions } @@ -834,23 +849,22 @@ private[hive] case class MetastoreRelation val tPartition = new org.apache.hadoop.hive.metastore.api.Partition tPartition.setDbName(databaseName) tPartition.setTableName(tableName) - tPartition.setValues(p.values.asJava) + tPartition.setValues(p.spec.values.toList.asJava) val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() tPartition.setSd(sd) - sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) - - sd.setLocation(p.storage.location) - sd.setInputFormat(p.storage.inputFormat) - sd.setOutputFormat(p.storage.outputFormat) + sd.setCols(table.schema.map(toHiveColumn).asJava) + p.storage.locationUri.foreach(sd.setLocation) + p.storage.inputFormat.foreach(sd.setInputFormat) + p.storage.outputFormat.foreach(sd.setOutputFormat) val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo sd.setSerdeInfo(serdeInfo) // maps and lists should be set only after all elements are ready (see HIVE-7975) - serdeInfo.setSerializationLib(p.storage.serde) + p.storage.serde.foreach(serdeInfo.setSerializationLib) val serdeParameters = new java.util.HashMap[String, String]() - table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } + table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } serdeInfo.setParameters(serdeParameters) @@ -877,10 +891,10 @@ private[hive] case class MetastoreRelation hiveQlTable.getMetadata ) - implicit class SchemaAttribute(f: HiveColumn) { + implicit class SchemaAttribute(f: CatalogColumn) { def toAttribute: AttributeReference = AttributeReference( f.name, - HiveMetastoreTypes.toDataType(f.hiveType), + HiveMetastoreTypes.toDataType(f.dataType), // Since data can be dumped in randomly with no validation, everything is nullable. nullable = true )(qualifiers = Seq(alias.getOrElse(tableName))) @@ -901,19 +915,22 @@ private[hive] case class MetastoreRelation val columnOrdinals = AttributeMap(attributes.zipWithIndex) override def inputFiles: Array[String] = { - val partLocations = table.getPartitions(Nil).map(_.storage.location).toArray + val partLocations = client + .getPartitionsByFilter(table, Nil) + .flatMap(_.storage.locationUri) + .toArray if (partLocations.nonEmpty) { partLocations } else { Array( - table.location.getOrElse( + table.storage.locationUri.getOrElse( sys.error(s"Could not get the location of ${table.qualifiedName}."))) } } override def newInstance(): MetastoreRelation = { - MetastoreRelation(databaseName, tableName, alias)(table)(sqlContext) + MetastoreRelation(databaseName, tableName, alias)(table, client, sqlContext) } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
