Repository: spark Updated Branches: refs/heads/master 7eb83fefd -> 6c3832b26
http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 752c037..5801051 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.spark.Logging import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.ParseUtils._ @@ -39,7 +40,6 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.SparkQl import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper -import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.types._ import org.apache.spark.sql.AnalysisException @@ -55,7 +55,7 @@ private[hive] case object NativePlaceholder extends LogicalPlan { } private[hive] case class CreateTableAsSelect( - tableDesc: HiveTable, + tableDesc: CatalogTable, child: LogicalPlan, allowExisting: Boolean) extends UnaryNode with Command { @@ -63,14 +63,14 @@ private[hive] case class CreateTableAsSelect( override lazy val resolved: Boolean = tableDesc.specifiedDatabase.isDefined && tableDesc.schema.nonEmpty && - tableDesc.serde.isDefined && - tableDesc.inputFormat.isDefined && - tableDesc.outputFormat.isDefined && + tableDesc.storage.serde.isDefined && + tableDesc.storage.inputFormat.isDefined && + tableDesc.storage.outputFormat.isDefined && childrenResolved } private[hive] case class CreateViewAsSelect( - tableDesc: HiveTable, + tableDesc: CatalogTable, child: LogicalPlan, allowExisting: Boolean, replace: Boolean, @@ -193,7 +193,7 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging view: ASTNode, viewNameParts: ASTNode, query: ASTNode, - schema: Seq[HiveColumn], + schema: Seq[CatalogColumn], properties: Map[String, String], allowExist: Boolean, replace: Boolean): CreateViewAsSelect = { @@ -201,18 +201,20 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging val originalText = query.source - val tableDesc = HiveTable( + val tableDesc = CatalogTable( specifiedDatabase = dbName, name = viewName, + tableType = CatalogTableType.VIRTUAL_VIEW, schema = schema, - partitionColumns = Seq.empty[HiveColumn], + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = None, + outputFormat = None, + serde = None, + serdeProperties = Map.empty[String, String] + ), properties = properties, - serdeProperties = Map[String, String](), - tableType = VirtualView, - location = None, - inputFormat = None, - outputFormat = None, - serde = None, + viewOriginalText = Some(originalText), viewText = Some(originalText)) // We need to keep the original SQL string so that if `spark.sql.nativeView` is @@ -314,8 +316,8 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging val schema = maybeColumns.map { cols => // We can't specify column types when create view, so fill it with null first, and // update it after the schema has been resolved later. - nodeToColumns(cols, lowerCase = true).map(_.copy(hiveType = null)) - }.getOrElse(Seq.empty[HiveColumn]) + nodeToColumns(cols, lowerCase = true).map(_.copy(dataType = null)) + }.getOrElse(Seq.empty[CatalogColumn]) val properties = scala.collection.mutable.Map.empty[String, String] @@ -369,19 +371,23 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts) // TODO add bucket support - var tableDesc: HiveTable = HiveTable( + var tableDesc: CatalogTable = CatalogTable( specifiedDatabase = dbName, name = tblName, - schema = Seq.empty[HiveColumn], - partitionColumns = Seq.empty[HiveColumn], - properties = Map[String, String](), - serdeProperties = Map[String, String](), - tableType = if (externalTable.isDefined) ExternalTable else ManagedTable, - location = None, - inputFormat = None, - outputFormat = None, - serde = None, - viewText = None) + tableType = + if (externalTable.isDefined) { + CatalogTableType.EXTERNAL_TABLE + } else { + CatalogTableType.MANAGED_TABLE + }, + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = None, + outputFormat = None, + serde = None, + serdeProperties = Map.empty[String, String] + ), + schema = Seq.empty[CatalogColumn]) // default storage type abbreviation (e.g. RCFile, ORC, PARQUET etc.) val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT) @@ -392,9 +398,10 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) } - hiveSerDe.inputFormat.foreach(f => tableDesc = tableDesc.copy(inputFormat = Some(f))) - hiveSerDe.outputFormat.foreach(f => tableDesc = tableDesc.copy(outputFormat = Some(f))) - hiveSerDe.serde.foreach(f => tableDesc = tableDesc.copy(serde = Some(f))) + tableDesc = tableDesc.withNewStorage( + inputFormat = hiveSerDe.inputFormat.orElse(tableDesc.storage.inputFormat), + outputFormat = hiveSerDe.outputFormat.orElse(tableDesc.storage.outputFormat), + serde = hiveSerDe.serde.orElse(tableDesc.storage.serde)) children.collect { case list @ Token("TOK_TABCOLLIST", _) => @@ -440,13 +447,13 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging // TODO support the nullFormat case _ => assert(false) } - tableDesc = tableDesc.copy( - serdeProperties = tableDesc.serdeProperties ++ serdeParams.asScala) + tableDesc = tableDesc.withNewStorage( + serdeProperties = tableDesc.storage.serdeProperties ++ serdeParams.asScala) case Token("TOK_TABLELOCATION", child :: Nil) => val location = EximUtil.relativeToAbsolutePath(hiveConf, unescapeSQLString(child.text)) - tableDesc = tableDesc.copy(location = Option(location)) + tableDesc = tableDesc.withNewStorage(locationUri = Option(location)) case Token("TOK_TABLESERIALIZER", child :: Nil) => - tableDesc = tableDesc.copy( + tableDesc = tableDesc.withNewStorage( serde = Option(unescapeSQLString(child.children.head.text))) if (child.numChildren == 2) { // This is based on the readProps(..) method in @@ -459,59 +466,59 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging .orNull (unescapeSQLString(prop), value) }.toMap - tableDesc = tableDesc.copy(serdeProperties = tableDesc.serdeProperties ++ serdeParams) + tableDesc = tableDesc.withNewStorage( + serdeProperties = tableDesc.storage.serdeProperties ++ serdeParams) } case Token("TOK_FILEFORMAT_GENERIC", child :: Nil) => child.text.toLowerCase(Locale.ENGLISH) match { case "orc" => - tableDesc = tableDesc.copy( + tableDesc = tableDesc.withNewStorage( inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"), outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) - if (tableDesc.serde.isEmpty) { - tableDesc = tableDesc.copy( + if (tableDesc.storage.serde.isEmpty) { + tableDesc = tableDesc.withNewStorage( serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) } case "parquet" => - tableDesc = tableDesc.copy( + tableDesc = tableDesc.withNewStorage( inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) - if (tableDesc.serde.isEmpty) { - tableDesc = tableDesc.copy( + if (tableDesc.storage.serde.isEmpty) { + tableDesc = tableDesc.withNewStorage( serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) } case "rcfile" => - tableDesc = tableDesc.copy( + tableDesc = tableDesc.withNewStorage( inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"), outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) - if (tableDesc.serde.isEmpty) { - tableDesc = tableDesc.copy(serde = - Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")) + if (tableDesc.storage.serde.isEmpty) { + tableDesc = tableDesc.withNewStorage( + serde = + Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")) } case "textfile" => - tableDesc = tableDesc.copy( - inputFormat = - Option("org.apache.hadoop.mapred.TextInputFormat"), - outputFormat = - Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) + tableDesc = tableDesc.withNewStorage( + inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) case "sequencefile" => - tableDesc = tableDesc.copy( + tableDesc = tableDesc.withNewStorage( inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"), outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")) case "avro" => - tableDesc = tableDesc.copy( + tableDesc = tableDesc.withNewStorage( inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"), outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat")) - if (tableDesc.serde.isEmpty) { - tableDesc = tableDesc.copy( + if (tableDesc.storage.serde.isEmpty) { + tableDesc = tableDesc.withNewStorage( serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")) } @@ -522,23 +529,21 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging case Token("TOK_TABLESERIALIZER", Token("TOK_SERDENAME", Token(serdeName, Nil) :: otherProps) :: Nil) => - tableDesc = tableDesc.copy(serde = Option(unquoteString(serdeName))) + tableDesc = tableDesc.withNewStorage(serde = Option(unquoteString(serdeName))) otherProps match { case Token("TOK_TABLEPROPERTIES", list :: Nil) :: Nil => - tableDesc = tableDesc.copy( - serdeProperties = tableDesc.serdeProperties ++ getProperties(list)) + tableDesc = tableDesc.withNewStorage( + serdeProperties = tableDesc.storage.serdeProperties ++ getProperties(list)) case _ => } case Token("TOK_TABLEPROPERTIES", list :: Nil) => tableDesc = tableDesc.copy(properties = tableDesc.properties ++ getProperties(list)) case list @ Token("TOK_TABLEFILEFORMAT", _) => - tableDesc = tableDesc.copy( - inputFormat = - Option(unescapeSQLString(list.children.head.text)), - outputFormat = - Option(unescapeSQLString(list.children(1).text))) + tableDesc = tableDesc.withNewStorage( + inputFormat = Option(unescapeSQLString(list.children.head.text)), + outputFormat = Option(unescapeSQLString(list.children(1).text))) case Token("TOK_STORAGEHANDLER", _) => throw new AnalysisException( "CREATE TABLE AS SELECT cannot be used for a non-native table") @@ -678,15 +683,15 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging // This is based the getColumns methods in // ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java - protected def nodeToColumns(node: ASTNode, lowerCase: Boolean): Seq[HiveColumn] = { + protected def nodeToColumns(node: ASTNode, lowerCase: Boolean): Seq[CatalogColumn] = { node.children.map(_.children).collect { case Token(rawColName, Nil) :: colTypeNode :: comment => - val colName = if (!lowerCase) rawColName - else rawColName.toLowerCase - HiveColumn( - cleanIdentifier(colName), - nodeToTypeString(colTypeNode), - comment.headOption.map(n => unescapeSQLString(n.text)).orNull) + val colName = if (!lowerCase) rawColName else rawColName.toLowerCase + CatalogColumn( + name = cleanIdentifier(colName), + dataType = nodeToTypeString(colTypeNode), + nullable = true, + comment.headOption.map(n => unescapeSQLString(n.text))) } } http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index f681cc6..6a0a089 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -18,67 +18,11 @@ package org.apache.spark.sql.hive.client import java.io.PrintStream -import java.util.{Map => JMap} -import javax.annotation.Nullable -import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException} +import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Expression -private[hive] case class HiveDatabase(name: String, location: String) - -private[hive] abstract class TableType { val name: String } -private[hive] case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" } -private[hive] case object IndexTable extends TableType { override val name = "INDEX_TABLE" } -private[hive] case object ManagedTable extends TableType { override val name = "MANAGED_TABLE" } -private[hive] case object VirtualView extends TableType { override val name = "VIRTUAL_VIEW" } - -// TODO: Use this for Tables and Partitions -private[hive] case class HiveStorageDescriptor( - location: String, - inputFormat: String, - outputFormat: String, - serde: String, - serdeProperties: Map[String, String]) - -private[hive] case class HivePartition( - values: Seq[String], - storage: HiveStorageDescriptor) - -private[hive] case class HiveColumn(name: String, @Nullable hiveType: String, comment: String) -private[hive] case class HiveTable( - specifiedDatabase: Option[String], - name: String, - schema: Seq[HiveColumn], - partitionColumns: Seq[HiveColumn], - properties: Map[String, String], - serdeProperties: Map[String, String], - tableType: TableType, - location: Option[String] = None, - inputFormat: Option[String] = None, - outputFormat: Option[String] = None, - serde: Option[String] = None, - viewText: Option[String] = None) { - - @transient - private[client] var client: HiveClient = _ - - private[client] def withClient(ci: HiveClient): this.type = { - client = ci - this - } - - def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved")) - - def isPartitioned: Boolean = partitionColumns.nonEmpty - - def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this) - - def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = - client.getPartitionsByFilter(this, predicates) - - // Hive does not support backticks when passing names to the client. - def qualifiedName: String = s"$database.$name" -} /** * An externally visible interface to the Hive client. This interface is shared across both the @@ -106,6 +50,9 @@ private[hive] trait HiveClient { /** Returns the names of all tables in the given database. */ def listTables(dbName: String): Seq[String] + /** Returns the names of tables in the given database that matches the given pattern. */ + def listTables(dbName: String, pattern: String): Seq[String] + /** Returns the name of the active database. */ def currentDatabase: String @@ -113,46 +60,133 @@ private[hive] trait HiveClient { def setCurrentDatabase(databaseName: String): Unit /** Returns the metadata for specified database, throwing an exception if it doesn't exist */ - def getDatabase(name: String): HiveDatabase = { - getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException) + final def getDatabase(name: String): CatalogDatabase = { + getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException(name)) } /** Returns the metadata for a given database, or None if it doesn't exist. */ - def getDatabaseOption(name: String): Option[HiveDatabase] + def getDatabaseOption(name: String): Option[CatalogDatabase] + + /** List the names of all the databases that match the specified pattern. */ + def listDatabases(pattern: String): Seq[String] /** Returns the specified table, or throws [[NoSuchTableException]]. */ - def getTable(dbName: String, tableName: String): HiveTable = { - getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException) + final def getTable(dbName: String, tableName: String): CatalogTable = { + getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException(dbName, tableName)) } - /** Returns the metadata for the specified table or None if it doens't exist. */ - def getTableOption(dbName: String, tableName: String): Option[HiveTable] + /** Returns the metadata for the specified table or None if it doesn't exist. */ + def getTableOption(dbName: String, tableName: String): Option[CatalogTable] /** Creates a view with the given metadata. */ - def createView(view: HiveTable): Unit + def createView(view: CatalogTable): Unit /** Updates the given view with new metadata. */ - def alertView(view: HiveTable): Unit + def alertView(view: CatalogTable): Unit /** Creates a table with the given metadata. */ - def createTable(table: HiveTable): Unit + def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit - /** Updates the given table with new metadata. */ - def alterTable(table: HiveTable): Unit + /** Drop the specified table. */ + def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit + + /** Alter a table whose name matches the one specified in `table`, assuming it exists. */ + final def alterTable(table: CatalogTable): Unit = alterTable(table.name, table) + + /** Updates the given table with new metadata, optionally renaming the table. */ + def alterTable(tableName: String, table: CatalogTable): Unit /** Creates a new database with the given name. */ - def createDatabase(database: HiveDatabase): Unit + def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit + + /** + * Drop the specified database, if it exists. + * + * @param name database to drop + * @param ignoreIfNotExists if true, do not throw error if the database does not exist + * @param cascade whether to remove all associated objects such as tables and functions + */ + def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit + + /** + * Alter a database whose name matches the one specified in `database`, assuming it exists. + */ + def alterDatabase(database: CatalogDatabase): Unit + + /** + * Create one or many partitions in the given table. + */ + def createPartitions( + db: String, + table: String, + parts: Seq[CatalogTablePartition], + ignoreIfExists: Boolean): Unit + + /** + * Drop one or many partitions in the given table. + * + * Note: Unfortunately, Hive does not currently provide a way to ignore this call if the + * partitions do not already exist. The seemingly relevant flag `ifExists` in + * [[org.apache.hadoop.hive.metastore.PartitionDropOptions]] is not read anywhere. + */ + def dropPartitions( + db: String, + table: String, + specs: Seq[Catalog.TablePartitionSpec]): Unit - /** Returns the specified paritition or None if it does not exist. */ + /** + * Rename one or many existing table partitions, assuming they exist. + */ + def renamePartitions( + db: String, + table: String, + specs: Seq[Catalog.TablePartitionSpec], + newSpecs: Seq[Catalog.TablePartitionSpec]): Unit + + /** + * Alter one or more table partitions whose specs match the ones specified in `newParts`, + * assuming the partitions exist. + */ + def alterPartitions( + db: String, + table: String, + newParts: Seq[CatalogTablePartition]): Unit + + /** Returns the specified partition, or throws [[NoSuchPartitionException]]. */ + final def getPartition( + dbName: String, + tableName: String, + spec: Catalog.TablePartitionSpec): CatalogTablePartition = { + getPartitionOption(dbName, tableName, spec).getOrElse { + throw new NoSuchPartitionException(dbName, tableName, spec) + } + } + + /** Returns the specified partition or None if it does not exist. */ + final def getPartitionOption( + db: String, + table: String, + spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition] = { + getPartitionOption(getTable(db, table), spec) + } + + /** Returns the specified partition or None if it does not exist. */ def getPartitionOption( - hTable: HiveTable, - partitionSpec: JMap[String, String]): Option[HivePartition] + table: CatalogTable, + spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition] + + /** Returns all partitions for the given table. */ + final def getAllPartitions(db: String, table: String): Seq[CatalogTablePartition] = { + getAllPartitions(getTable(db, table)) + } /** Returns all partitions for the given table. */ - def getAllPartitions(hTable: HiveTable): Seq[HivePartition] + def getAllPartitions(table: CatalogTable): Seq[CatalogTablePartition] /** Returns partitions filtered by predicates for the given table. */ - def getPartitionsByFilter(hTable: HiveTable, predicates: Seq[Expression]): Seq[HivePartition] + def getPartitionsByFilter( + table: CatalogTable, + predicates: Seq[Expression]): Seq[CatalogTablePartition] /** Loads a static partition into an existing table. */ def loadPartition( @@ -181,6 +215,29 @@ private[hive] trait HiveClient { holdDDLTime: Boolean, listBucketingEnabled: Boolean): Unit + /** Create a function in an existing database. */ + def createFunction(db: String, func: CatalogFunction): Unit + + /** Drop an existing function an the database. */ + def dropFunction(db: String, name: String): Unit + + /** Rename an existing function in the database. */ + def renameFunction(db: String, oldName: String, newName: String): Unit + + /** Alter a function whose name matches the one specified in `func`, assuming it exists. */ + def alterFunction(db: String, func: CatalogFunction): Unit + + /** Return an existing function in the database, assuming it exists. */ + final def getFunction(db: String, name: String): CatalogFunction = { + getFunctionOption(db, name).getOrElse(throw new NoSuchFunctionException(db, name)) + } + + /** Return an existing function in the database, or None if it doesn't exist. */ + def getFunctionOption(db: String, name: String): Option[CatalogFunction] + + /** Return the names of all functions that match the given pattern in the database. */ + def listFunctions(db: String, pattern: String): Seq[String] + /** Add a jar into class loader */ def addJar(path: String): Unit @@ -192,4 +249,5 @@ private[hive] trait HiveClient { /** Used for testing only. Removes all metadata from this instance of Hive. */ def reset(): Unit + } http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index cf1ff55..7a007d2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -18,24 +18,25 @@ package org.apache.spark.sql.hive.client import java.io.{File, PrintStream} -import java.util.{Map => JMap} import scala.collection.JavaConverters._ import scala.language.reflectiveCalls import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.metastore.{TableType => HTableType} -import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema} -import org.apache.hadoop.hive.ql.{metadata, Driver} -import org.apache.hadoop.hive.ql.metadata.Hive +import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} +import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceUri} +import org.apache.hadoop.hive.ql.Driver +import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.AddPartitionDesc import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader} import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{Logging, SparkConf, SparkException} -import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException +import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.util.{CircularBuffer, Utils} @@ -234,167 +235,184 @@ private[hive] class HiveClientImpl( if (getDatabaseOption(databaseName).isDefined) { state.setCurrentDatabase(databaseName) } else { - throw new NoSuchDatabaseException + throw new NoSuchDatabaseException(databaseName) } } - override def createDatabase(database: HiveDatabase): Unit = withHiveState { + override def createDatabase( + database: CatalogDatabase, + ignoreIfExists: Boolean): Unit = withHiveState { client.createDatabase( - new Database( + new HiveDatabase( database.name, - "", - new File(database.location).toURI.toString, - new java.util.HashMap), - true) + database.description, + database.locationUri, + database.properties.asJava), + ignoreIfExists) } - override def getDatabaseOption(name: String): Option[HiveDatabase] = withHiveState { + override def dropDatabase( + name: String, + ignoreIfNotExists: Boolean, + cascade: Boolean): Unit = withHiveState { + client.dropDatabase(name, true, ignoreIfNotExists, cascade) + } + + override def alterDatabase(database: CatalogDatabase): Unit = withHiveState { + client.alterDatabase( + database.name, + new HiveDatabase( + database.name, + database.description, + database.locationUri, + database.properties.asJava)) + } + + override def getDatabaseOption(name: String): Option[CatalogDatabase] = withHiveState { Option(client.getDatabase(name)).map { d => - HiveDatabase( + CatalogDatabase( name = d.getName, - location = d.getLocationUri) + description = d.getDescription, + locationUri = d.getLocationUri, + properties = d.getParameters.asScala.toMap) } } + override def listDatabases(pattern: String): Seq[String] = withHiveState { + client.getDatabasesByPattern(pattern).asScala.toSeq + } + override def getTableOption( dbName: String, - tableName: String): Option[HiveTable] = withHiveState { - + tableName: String): Option[CatalogTable] = withHiveState { logDebug(s"Looking up $dbName.$tableName") - - val hiveTable = Option(client.getTable(dbName, tableName, false)) - val converted = hiveTable.map { h => - - HiveTable( - name = h.getTableName, + Option(client.getTable(dbName, tableName, false)).map { h => + CatalogTable( specifiedDatabase = Option(h.getDbName), - schema = h.getCols.asScala.map(f => HiveColumn(f.getName, f.getType, f.getComment)), - partitionColumns = h.getPartCols.asScala.map(f => - HiveColumn(f.getName, f.getType, f.getComment)), - properties = h.getParameters.asScala.toMap, - serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap, + name = h.getTableName, tableType = h.getTableType match { - case HTableType.MANAGED_TABLE => ManagedTable - case HTableType.EXTERNAL_TABLE => ExternalTable - case HTableType.VIRTUAL_VIEW => VirtualView - case HTableType.INDEX_TABLE => IndexTable + case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL_TABLE + case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED_TABLE + case HiveTableType.INDEX_TABLE => CatalogTableType.INDEX_TABLE + case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIRTUAL_VIEW }, - location = shim.getDataLocation(h), - inputFormat = Option(h.getInputFormatClass).map(_.getName), - outputFormat = Option(h.getOutputFormatClass).map(_.getName), - serde = Option(h.getSerializationLib), - viewText = Option(h.getViewExpandedText)).withClient(this) + schema = h.getCols.asScala.map(fromHiveColumn), + partitionColumns = h.getPartCols.asScala.map(fromHiveColumn), + sortColumns = Seq(), + numBuckets = h.getNumBuckets, + createTime = h.getTTable.getCreateTime.toLong * 1000, + lastAccessTime = h.getLastAccessTime.toLong * 1000, + storage = CatalogStorageFormat( + locationUri = shim.getDataLocation(h), + inputFormat = Option(h.getInputFormatClass).map(_.getName), + outputFormat = Option(h.getOutputFormatClass).map(_.getName), + serde = Option(h.getSerializationLib), + serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap + ), + properties = h.getParameters.asScala.toMap, + viewOriginalText = Option(h.getViewOriginalText), + viewText = Option(h.getViewExpandedText)) } - converted } - private def toInputFormat(name: String) = - Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]] - - private def toOutputFormat(name: String) = - Utils.classForName(name) - .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] - - private def toQlTable(table: HiveTable): metadata.Table = { - val qlTable = new metadata.Table(table.database, table.name) - - qlTable.setFields(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) - qlTable.setPartCols( - table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) - table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) } - table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) } - - // set owner - qlTable.setOwner(conf.getUser) - // set create time - qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int]) - - table.location.foreach { loc => shim.setDataLocation(qlTable, loc) } - table.inputFormat.map(toInputFormat).foreach(qlTable.setInputFormatClass) - table.outputFormat.map(toOutputFormat).foreach(qlTable.setOutputFormatClass) - table.serde.foreach(qlTable.setSerializationLib) - - qlTable + override def createView(view: CatalogTable): Unit = withHiveState { + client.createTable(toHiveViewTable(view)) } - private def toViewTable(view: HiveTable): metadata.Table = { - // TODO: this is duplicated with `toQlTable` except the table type stuff. - val tbl = new metadata.Table(view.database, view.name) - tbl.setTableType(HTableType.VIRTUAL_VIEW) - tbl.setSerializationLib(null) - tbl.clearSerDeInfo() - - // TODO: we will save the same SQL string to original and expanded text, which is different - // from Hive. - tbl.setViewOriginalText(view.viewText.get) - tbl.setViewExpandedText(view.viewText.get) - - tbl.setFields(view.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) - view.properties.foreach { case (k, v) => tbl.setProperty(k, v) } - - // set owner - tbl.setOwner(conf.getUser) - // set create time - tbl.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int]) - - tbl + override def alertView(view: CatalogTable): Unit = withHiveState { + client.alterTable(view.qualifiedName, toHiveViewTable(view)) } - override def createView(view: HiveTable): Unit = withHiveState { - client.createTable(toViewTable(view)) + override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState { + client.createTable(toHiveTable(table), ignoreIfExists) } - override def alertView(view: HiveTable): Unit = withHiveState { - client.alterTable(view.qualifiedName, toViewTable(view)) + override def dropTable( + dbName: String, + tableName: String, + ignoreIfNotExists: Boolean): Unit = withHiveState { + client.dropTable(dbName, tableName, true, ignoreIfNotExists) } - override def createTable(table: HiveTable): Unit = withHiveState { - val qlTable = toQlTable(table) - client.createTable(qlTable) + override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState { + val hiveTable = toHiveTable(table) + // Do not use `table.qualifiedName` here because this may be a rename + val qualifiedTableName = s"${table.database}.$tableName" + client.alterTable(qualifiedTableName, hiveTable) } - override def alterTable(table: HiveTable): Unit = withHiveState { - val qlTable = toQlTable(table) - client.alterTable(table.qualifiedName, qlTable) + override def createPartitions( + db: String, + table: String, + parts: Seq[CatalogTablePartition], + ignoreIfExists: Boolean): Unit = withHiveState { + val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists) + parts.foreach { s => + addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull) + } + client.createPartitions(addPartitionDesc) + } + + override def dropPartitions( + db: String, + table: String, + specs: Seq[Catalog.TablePartitionSpec]): Unit = withHiveState { + // TODO: figure out how to drop multiple partitions in one call + specs.foreach { s => client.dropPartition(db, table, s.values.toList.asJava, true) } + } + + override def renamePartitions( + db: String, + table: String, + specs: Seq[Catalog.TablePartitionSpec], + newSpecs: Seq[Catalog.TablePartitionSpec]): Unit = withHiveState { + require(specs.size == newSpecs.size, "number of old and new partition specs differ") + val catalogTable = getTable(db, table) + val hiveTable = toHiveTable(catalogTable) + specs.zip(newSpecs).foreach { case (oldSpec, newSpec) => + val hivePart = getPartitionOption(catalogTable, oldSpec) + .map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) } + .getOrElse { throw new NoSuchPartitionException(db, table, oldSpec) } + client.renamePartition(hiveTable, oldSpec.asJava, hivePart) + } } - private def toHivePartition(partition: metadata.Partition): HivePartition = { - val apiPartition = partition.getTPartition - HivePartition( - values = Option(apiPartition.getValues).map(_.asScala).getOrElse(Seq.empty), - storage = HiveStorageDescriptor( - location = apiPartition.getSd.getLocation, - inputFormat = apiPartition.getSd.getInputFormat, - outputFormat = apiPartition.getSd.getOutputFormat, - serde = apiPartition.getSd.getSerdeInfo.getSerializationLib, - serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap)) + override def alterPartitions( + db: String, + table: String, + newParts: Seq[CatalogTablePartition]): Unit = withHiveState { + val hiveTable = toHiveTable(getTable(db, table)) + client.alterPartitions(table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava) } override def getPartitionOption( - table: HiveTable, - partitionSpec: JMap[String, String]): Option[HivePartition] = withHiveState { - - val qlTable = toQlTable(table) - val qlPartition = client.getPartition(qlTable, partitionSpec, false) - Option(qlPartition).map(toHivePartition) + table: CatalogTable, + spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { + val hiveTable = toHiveTable(table) + val hivePartition = client.getPartition(hiveTable, spec.asJava, false) + Option(hivePartition).map(fromHivePartition) } - override def getAllPartitions(hTable: HiveTable): Seq[HivePartition] = withHiveState { - val qlTable = toQlTable(hTable) - shim.getAllPartitions(client, qlTable).map(toHivePartition) + override def getAllPartitions(table: CatalogTable): Seq[CatalogTablePartition] = withHiveState { + val hiveTable = toHiveTable(table) + shim.getAllPartitions(client, hiveTable).map(fromHivePartition) } override def getPartitionsByFilter( - hTable: HiveTable, - predicates: Seq[Expression]): Seq[HivePartition] = withHiveState { - val qlTable = toQlTable(hTable) - shim.getPartitionsByFilter(client, qlTable, predicates).map(toHivePartition) + table: CatalogTable, + predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState { + val hiveTable = toHiveTable(table) + shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition) } override def listTables(dbName: String): Seq[String] = withHiveState { client.getAllTables(dbName).asScala } + override def listTables(dbName: String, pattern: String): Seq[String] = withHiveState { + client.getTablesByPattern(dbName, pattern).asScala + } + /** * Runs the specified SQL query using Hive. */ @@ -508,6 +526,34 @@ private[hive] class HiveClientImpl( listBucketingEnabled) } + override def createFunction(db: String, func: CatalogFunction): Unit = withHiveState { + client.createFunction(toHiveFunction(func, db)) + } + + override def dropFunction(db: String, name: String): Unit = withHiveState { + client.dropFunction(db, name) + } + + override def renameFunction(db: String, oldName: String, newName: String): Unit = withHiveState { + val catalogFunc = getFunction(db, oldName).copy(name = newName) + val hiveFunc = toHiveFunction(catalogFunc, db) + client.alterFunction(db, oldName, hiveFunc) + } + + override def alterFunction(db: String, func: CatalogFunction): Unit = withHiveState { + client.alterFunction(db, func.name, toHiveFunction(func, db)) + } + + override def getFunctionOption( + db: String, + name: String): Option[CatalogFunction] = withHiveState { + Option(client.getFunction(db, name)).map(fromHiveFunction) + } + + override def listFunctions(db: String, pattern: String): Seq[String] = withHiveState { + client.getFunctions(db, pattern).asScala + } + def addJar(path: String): Unit = { val uri = new Path(path).toUri val jarURL = if (uri.getScheme == null) { @@ -541,4 +587,97 @@ private[hive] class HiveClientImpl( client.dropDatabase(db, true, false, true) } } + + + /* -------------------------------------------------------- * + | Helper methods for converting to and from Hive classes | + * -------------------------------------------------------- */ + + private def toInputFormat(name: String) = + Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]] + + private def toOutputFormat(name: String) = + Utils.classForName(name) + .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] + + private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = { + new HiveFunction( + f.name, + db, + f.className, + null, + PrincipalType.USER, + (System.currentTimeMillis / 1000).toInt, + FunctionType.JAVA, + List.empty[ResourceUri].asJava) + } + + private def fromHiveFunction(hf: HiveFunction): CatalogFunction = { + new CatalogFunction(hf.getFunctionName, hf.getClassName) + } + + private def toHiveColumn(c: CatalogColumn): FieldSchema = { + new FieldSchema(c.name, c.dataType, c.comment.orNull) + } + + private def fromHiveColumn(hc: FieldSchema): CatalogColumn = { + new CatalogColumn( + name = hc.getName, + dataType = hc.getType, + nullable = true, + comment = Option(hc.getComment)) + } + + private def toHiveTable(table: CatalogTable): HiveTable = { + val hiveTable = new HiveTable(table.database, table.name) + hiveTable.setTableType(table.tableType match { + case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE + case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE + case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE + case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW + }) + hiveTable.setFields(table.schema.map(toHiveColumn).asJava) + hiveTable.setPartCols(table.partitionColumns.map(toHiveColumn).asJava) + // TODO: set sort columns here too + hiveTable.setOwner(conf.getUser) + hiveTable.setNumBuckets(table.numBuckets) + hiveTable.setCreateTime((table.createTime / 1000).toInt) + hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt) + table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) } + table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass) + table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass) + table.storage.serde.foreach(hiveTable.setSerializationLib) + table.storage.serdeProperties.foreach { case (k, v) => hiveTable.setSerdeParam(k, v) } + table.properties.foreach { case (k, v) => hiveTable.setProperty(k, v) } + table.viewOriginalText.foreach { t => hiveTable.setViewOriginalText(t) } + table.viewText.foreach { t => hiveTable.setViewExpandedText(t) } + hiveTable + } + + private def toHiveViewTable(view: CatalogTable): HiveTable = { + val tbl = toHiveTable(view) + tbl.setTableType(HiveTableType.VIRTUAL_VIEW) + tbl.setSerializationLib(null) + tbl.clearSerDeInfo() + tbl + } + + private def toHivePartition( + p: CatalogTablePartition, + ht: HiveTable): HivePartition = { + new HivePartition(ht, p.spec.asJava, p.storage.locationUri.map { l => new Path(l) }.orNull) + } + + private def fromHivePartition(hp: HivePartition): CatalogTablePartition = { + val apiPartition = hp.getTPartition + CatalogTablePartition( + spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), + storage = CatalogStorageFormat( + locationUri = Option(apiPartition.getSd.getLocation), + inputFormat = Option(apiPartition.getSd.getInputFormat), + outputFormat = Option(apiPartition.getSd.getOutputFormat), + serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), + serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap)) + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 4c0aae6..3f81c99 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable} import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, MetastoreRelation} -import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable} /** * Create table and insert the query result into it. @@ -33,7 +33,7 @@ import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable} */ private[hive] case class CreateTableAsSelect( - tableDesc: HiveTable, + tableDesc: CatalogTable, query: LogicalPlan, allowExisting: Boolean) extends RunnableCommand { @@ -51,25 +51,25 @@ case class CreateTableAsSelect( import org.apache.hadoop.mapred.TextInputFormat val withFormat = - tableDesc.copy( + tableDesc.withNewStorage( inputFormat = - tableDesc.inputFormat.orElse(Some(classOf[TextInputFormat].getName)), + tableDesc.storage.inputFormat.orElse(Some(classOf[TextInputFormat].getName)), outputFormat = - tableDesc.outputFormat + tableDesc.storage.outputFormat .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)), - serde = tableDesc.serde.orElse(Some(classOf[LazySimpleSerDe].getName()))) + serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName))) val withSchema = if (withFormat.schema.isEmpty) { // Hive doesn't support specifying the column list for target table in CTAS // However we don't think SparkSQL should follow that. - tableDesc.copy(schema = - query.output.map(c => - HiveColumn(c.name, HiveMetastoreTypes.toMetastoreType(c.dataType), null))) + tableDesc.copy(schema = query.output.map { c => + CatalogColumn(c.name, HiveMetastoreTypes.toMetastoreType(c.dataType)) + }) } else { withFormat } - hiveContext.catalog.client.createTable(withSchema) + hiveContext.catalog.client.createTable(withSchema, ignoreIfExists = false) // Get the Metastore Relation hiveContext.catalog.lookupRelation(tableIdentifier, None) match { http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala index 5da58a7..2914d03 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala @@ -21,11 +21,11 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable} import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.RunnableCommand -import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, SQLBuilder} -import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable} +import org.apache.spark.sql.hive.{ HiveContext, HiveMetastoreTypes, SQLBuilder} /** * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of @@ -34,7 +34,7 @@ import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable} // TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is different // from Hive and may not work for some cases like create view on self join. private[hive] case class CreateViewAsSelect( - tableDesc: HiveTable, + tableDesc: CatalogTable, child: LogicalPlan, allowExisting: Boolean, orReplace: Boolean) extends RunnableCommand { @@ -72,7 +72,7 @@ private[hive] case class CreateViewAsSelect( Seq.empty[Row] } - private def prepareTable(sqlContext: SQLContext): HiveTable = { + private def prepareTable(sqlContext: SQLContext): CatalogTable = { val expandedText = if (sqlContext.conf.canonicalView) { try rebuildViewQueryString(sqlContext) catch { case NonFatal(e) => wrapViewTextWithSelect @@ -83,12 +83,16 @@ private[hive] case class CreateViewAsSelect( val viewSchema = { if (tableDesc.schema.isEmpty) { - childSchema.map { attr => - HiveColumn(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), null) + childSchema.map { a => + CatalogColumn(a.name, HiveMetastoreTypes.toMetastoreType(a.dataType)) } } else { - childSchema.zip(tableDesc.schema).map { case (attr, col) => - HiveColumn(col.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), col.comment) + childSchema.zip(tableDesc.schema).map { case (a, col) => + CatalogColumn( + col.name, + HiveMetastoreTypes.toMetastoreType(a.dataType), + nullable = true, + col.comment) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index feb133d..d316664 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -205,7 +205,7 @@ case class InsertIntoHiveTable( val oldPart = catalog.client.getPartitionOption( catalog.client.getTable(table.databaseName, table.tableName), - partitionSpec.asJava) + partitionSpec) if (oldPart.isEmpty || !ifNotExists) { catalog.client.loadPartition( http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala new file mode 100644 index 0000000..f73e7e2 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.hadoop.util.VersionInfo + +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader} +import org.apache.spark.util.Utils + + +/** + * Test suite for the [[HiveCatalog]]. + */ +class HiveCatalogSuite extends CatalogTestCases { + + private val client: HiveClient = { + IsolatedClientLoader.forVersion( + hiveMetastoreVersion = HiveContext.hiveExecutionVersion, + hadoopVersion = VersionInfo.getVersion).createClient() + } + + protected override val tableInputFormat: String = + "org.apache.hadoop.mapred.SequenceFileInputFormat" + protected override val tableOutputFormat: String = + "org.apache.hadoop.mapred.SequenceFileOutputFormat" + + protected override def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath + + protected override def resetState(): Unit = client.reset() + + protected override def newEmptyCatalog(): Catalog = new HiveCatalog(client) + +} http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 14a83d5..f8764d4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -21,7 +21,7 @@ import java.io.File import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{QueryTest, Row, SaveMode, SQLConf} -import org.apache.spark.sql.hive.client.{ExternalTable, ManagedTable} +import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils} import org.apache.spark.sql.types.{DecimalType, StringType, StructType} @@ -83,16 +83,16 @@ class DataSourceWithHiveMetastoreCatalogSuite } val hiveTable = catalog.client.getTable("default", "t") - assert(hiveTable.inputFormat === Some(inputFormat)) - assert(hiveTable.outputFormat === Some(outputFormat)) - assert(hiveTable.serde === Some(serde)) + assert(hiveTable.storage.inputFormat === Some(inputFormat)) + assert(hiveTable.storage.outputFormat === Some(outputFormat)) + assert(hiveTable.storage.serde === Some(serde)) - assert(!hiveTable.isPartitioned) - assert(hiveTable.tableType === ManagedTable) + assert(hiveTable.partitionColumns.isEmpty) + assert(hiveTable.tableType === CatalogTableType.MANAGED_TABLE) val columns = hiveTable.schema assert(columns.map(_.name) === Seq("d1", "d2")) - assert(columns.map(_.hiveType) === Seq("decimal(10,3)", "string")) + assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string")) checkAnswer(table("t"), testDF) assert(runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2")) @@ -114,16 +114,17 @@ class DataSourceWithHiveMetastoreCatalogSuite } val hiveTable = catalog.client.getTable("default", "t") - assert(hiveTable.inputFormat === Some(inputFormat)) - assert(hiveTable.outputFormat === Some(outputFormat)) - assert(hiveTable.serde === Some(serde)) + assert(hiveTable.storage.inputFormat === Some(inputFormat)) + assert(hiveTable.storage.outputFormat === Some(outputFormat)) + assert(hiveTable.storage.serde === Some(serde)) - assert(hiveTable.tableType === ExternalTable) - assert(hiveTable.location.get === path.toURI.toString.stripSuffix(File.separator)) + assert(hiveTable.tableType === CatalogTableType.EXTERNAL_TABLE) + assert(hiveTable.storage.locationUri === + Some(path.toURI.toString.stripSuffix(File.separator))) val columns = hiveTable.schema assert(columns.map(_.name) === Seq("d1", "d2")) - assert(columns.map(_.hiveType) === Seq("decimal(10,3)", "string")) + assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string")) checkAnswer(table("t"), testDF) assert(runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2")) @@ -143,17 +144,16 @@ class DataSourceWithHiveMetastoreCatalogSuite """.stripMargin) val hiveTable = catalog.client.getTable("default", "t") - assert(hiveTable.inputFormat === Some(inputFormat)) - assert(hiveTable.outputFormat === Some(outputFormat)) - assert(hiveTable.serde === Some(serde)) + assert(hiveTable.storage.inputFormat === Some(inputFormat)) + assert(hiveTable.storage.outputFormat === Some(outputFormat)) + assert(hiveTable.storage.serde === Some(serde)) - assert(hiveTable.isPartitioned === false) - assert(hiveTable.tableType === ExternalTable) - assert(hiveTable.partitionColumns.length === 0) + assert(hiveTable.partitionColumns.isEmpty) + assert(hiveTable.tableType === CatalogTableType.EXTERNAL_TABLE) val columns = hiveTable.schema assert(columns.map(_.name) === Seq("d1", "d2")) - assert(columns.map(_.hiveType) === Seq("int", "string")) + assert(columns.map(_.dataType) === Seq("int", "string")) checkAnswer(table("t"), Row(1, "val_1")) assert(runSqlHive("SELECT * FROM t") === Seq("1\tval_1")) http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala index 137dadd..e869c0e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala @@ -22,15 +22,15 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.SimpleParserConf import org.apache.spark.sql.catalyst.plans.logical.Generate -import org.apache.spark.sql.hive.client.{ExternalTable, HiveColumn, HiveTable, ManagedTable} class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { val parser = new HiveQl(SimpleParserConf()) - private def extractTableDesc(sql: String): (HiveTable, Boolean) = { + private def extractTableDesc(sql: String): (CatalogTable, Boolean) = { parser.parsePlan(sql).collect { case CreateTableAsSelect(desc, child, allowExisting) => (desc, allowExisting) }.head @@ -53,28 +53,29 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { |AS SELECT * FROM src""".stripMargin val (desc, exists) = extractTableDesc(s1) - assert(exists == true) + assert(exists) assert(desc.specifiedDatabase == Some("mydb")) assert(desc.name == "page_view") - assert(desc.tableType == ExternalTable) - assert(desc.location == Some("/user/external/page_view")) + assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE) + assert(desc.storage.locationUri == Some("/user/external/page_view")) assert(desc.schema == - HiveColumn("viewtime", "int", null) :: - HiveColumn("userid", "bigint", null) :: - HiveColumn("page_url", "string", null) :: - HiveColumn("referrer_url", "string", null) :: - HiveColumn("ip", "string", "IP Address of the User") :: - HiveColumn("country", "string", "country of origination") :: Nil) + CatalogColumn("viewtime", "int") :: + CatalogColumn("userid", "bigint") :: + CatalogColumn("page_url", "string") :: + CatalogColumn("referrer_url", "string") :: + CatalogColumn("ip", "string", comment = Some("IP Address of the User")) :: + CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil) // TODO will be SQLText assert(desc.viewText == Option("This is the staging page view table")) assert(desc.partitionColumns == - HiveColumn("dt", "string", "date type") :: - HiveColumn("hour", "string", "hour of the day") :: Nil) - assert(desc.serdeProperties == + CatalogColumn("dt", "string", comment = Some("date type")) :: + CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil) + assert(desc.storage.serdeProperties == Map((serdeConstants.SERIALIZATION_FORMAT, "\054"), (serdeConstants.FIELD_DELIM, "\054"))) - assert(desc.inputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) - assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) - assert(desc.serde == Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")) + assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) + assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) + assert(desc.storage.serde == + Some("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")) assert(desc.properties == Map(("p1", "v1"), ("p2", "v2"))) } @@ -98,27 +99,27 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { |AS SELECT * FROM src""".stripMargin val (desc, exists) = extractTableDesc(s2) - assert(exists == true) + assert(exists) assert(desc.specifiedDatabase == Some("mydb")) assert(desc.name == "page_view") - assert(desc.tableType == ExternalTable) - assert(desc.location == Some("/user/external/page_view")) + assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE) + assert(desc.storage.locationUri == Some("/user/external/page_view")) assert(desc.schema == - HiveColumn("viewtime", "int", null) :: - HiveColumn("userid", "bigint", null) :: - HiveColumn("page_url", "string", null) :: - HiveColumn("referrer_url", "string", null) :: - HiveColumn("ip", "string", "IP Address of the User") :: - HiveColumn("country", "string", "country of origination") :: Nil) + CatalogColumn("viewtime", "int") :: + CatalogColumn("userid", "bigint") :: + CatalogColumn("page_url", "string") :: + CatalogColumn("referrer_url", "string") :: + CatalogColumn("ip", "string", comment = Some("IP Address of the User")) :: + CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil) // TODO will be SQLText assert(desc.viewText == Option("This is the staging page view table")) assert(desc.partitionColumns == - HiveColumn("dt", "string", "date type") :: - HiveColumn("hour", "string", "hour of the day") :: Nil) - assert(desc.serdeProperties == Map()) - assert(desc.inputFormat == Option("parquet.hive.DeprecatedParquetInputFormat")) - assert(desc.outputFormat == Option("parquet.hive.DeprecatedParquetOutputFormat")) - assert(desc.serde == Option("parquet.hive.serde.ParquetHiveSerDe")) + CatalogColumn("dt", "string", comment = Some("date type")) :: + CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil) + assert(desc.storage.serdeProperties == Map()) + assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat")) + assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat")) + assert(desc.storage.serde == Some("parquet.hive.serde.ParquetHiveSerDe")) assert(desc.properties == Map(("p1", "v1"), ("p2", "v2"))) } @@ -128,14 +129,15 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { assert(exists == false) assert(desc.specifiedDatabase == None) assert(desc.name == "page_view") - assert(desc.tableType == ManagedTable) - assert(desc.location == None) - assert(desc.schema == Seq.empty[HiveColumn]) + assert(desc.tableType == CatalogTableType.MANAGED_TABLE) + assert(desc.storage.locationUri == None) + assert(desc.schema == Seq.empty[CatalogColumn]) assert(desc.viewText == None) // TODO will be SQLText - assert(desc.serdeProperties == Map()) - assert(desc.inputFormat == Option("org.apache.hadoop.mapred.TextInputFormat")) - assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) - assert(desc.serde.isEmpty) + assert(desc.storage.serdeProperties == Map()) + assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) + assert(desc.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) + assert(desc.storage.serde.isEmpty) assert(desc.properties == Map()) } @@ -162,14 +164,14 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { assert(exists == false) assert(desc.specifiedDatabase == None) assert(desc.name == "ctas2") - assert(desc.tableType == ManagedTable) - assert(desc.location == None) - assert(desc.schema == Seq.empty[HiveColumn]) + assert(desc.tableType == CatalogTableType.MANAGED_TABLE) + assert(desc.storage.locationUri == None) + assert(desc.schema == Seq.empty[CatalogColumn]) assert(desc.viewText == None) // TODO will be SQLText - assert(desc.serdeProperties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2"))) - assert(desc.inputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) - assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) - assert(desc.serde == Option("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe")) + assert(desc.storage.serdeProperties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2"))) + assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) + assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) + assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe")) assert(desc.properties == Map(("tbl_p1" -> "p11"), ("tbl_p2" -> "p22"))) } http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index d9e4b02..0c288bd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -25,9 +25,9 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation -import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ @@ -724,20 +724,25 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val tableName = "spark6655" withTable(tableName) { val schema = StructType(StructField("int", IntegerType, true) :: Nil) - val hiveTable = HiveTable( + val hiveTable = CatalogTable( specifiedDatabase = Some("default"), name = tableName, + tableType = CatalogTableType.MANAGED_TABLE, schema = Seq.empty, - partitionColumns = Seq.empty, + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = None, + outputFormat = None, + serde = None, + serdeProperties = Map( + "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))) + ), properties = Map( "spark.sql.sources.provider" -> "json", "spark.sql.sources.schema" -> schema.json, - "EXTERNAL" -> "FALSE"), - tableType = ManagedTable, - serdeProperties = Map( - "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName)))) + "EXTERNAL" -> "FALSE")) - catalog.client.createTable(hiveTable) + catalog.client.createTable(hiveTable, ignoreIfExists = false) invalidateTable(tableName) val actualSchema = table(tableName).schema @@ -916,7 +921,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // As a proxy for verifying that the table was stored in Hive compatible format, we verify that // each column of the table is of native type StringType. assert(catalog.client.getTable("default", "not_skip_hive_metadata").schema - .forall(column => HiveMetastoreTypes.toDataType(column.hiveType) == StringType)) + .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType)) catalog.createDataSourceTable( tableIdent = TableIdentifier("skip_hive_metadata"), @@ -930,6 +935,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // As a proxy for verifying that the table was stored in SparkSQL format, we verify that // the table has a column type as array of StringType. assert(catalog.client.getTable("default", "skip_hive_metadata").schema - .forall(column => HiveMetastoreTypes.toDataType(column.hiveType) == ArrayType(StringType))) + .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType))) } } http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index c2c896e..488f298 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -26,9 +26,9 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle private def checkTablePath(dbName: String, tableName: String): Unit = { val metastoreTable = hiveContext.catalog.client.getTable(dbName, tableName) - val expectedPath = hiveContext.catalog.client.getDatabase(dbName).location + "/" + tableName + val expectedPath = hiveContext.catalog.client.getDatabase(dbName).locationUri + "/" + tableName - assert(metastoreTable.serdeProperties("path") === expectedPath) + assert(metastoreTable.storage.serdeProperties("path") === expectedPath) } test(s"saveAsTable() to non-default database - with USE - Overwrite") { http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 1344a2c..d850d52 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -22,6 +22,7 @@ import java.io.File import org.apache.hadoop.util.VersionInfo import org.apache.spark.{Logging, SparkFunSuite} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.hive.HiveContext @@ -60,8 +61,8 @@ class VersionsSuite extends SparkFunSuite with Logging { hadoopVersion = VersionInfo.getVersion, config = buildConf(), ivyPath = ivyPath).createClient() - val db = new HiveDatabase("default", "") - badClient.createDatabase(db) + val db = new CatalogDatabase("default", "desc", "loc", Map()) + badClient.createDatabase(db, ignoreIfExists = true) } private def getNestedMessages(e: Throwable): String = { @@ -116,29 +117,27 @@ class VersionsSuite extends SparkFunSuite with Logging { } test(s"$version: createDatabase") { - val db = HiveDatabase("default", "") - client.createDatabase(db) + val db = CatalogDatabase("default", "desc", "loc", Map()) + client.createDatabase(db, ignoreIfExists = true) } test(s"$version: createTable") { val table = - HiveTable( + CatalogTable( specifiedDatabase = Option("default"), name = "src", - schema = Seq(HiveColumn("key", "int", "")), - partitionColumns = Seq.empty, - properties = Map.empty, - serdeProperties = Map.empty, - tableType = ManagedTable, - location = None, - inputFormat = - Some(classOf[org.apache.hadoop.mapred.TextInputFormat].getName), - outputFormat = - Some(classOf[org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat[_, _]].getName), - serde = - Some(classOf[org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe].getName())) - - client.createTable(table) + tableType = CatalogTableType.MANAGED_TABLE, + schema = Seq(CatalogColumn("key", "int")), + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = Some(classOf[org.apache.hadoop.mapred.TextInputFormat].getName), + outputFormat = Some( + classOf[org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat[_, _]].getName), + serde = Some(classOf[org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe].getName()), + serdeProperties = Map.empty + )) + + client.createTable(table, ignoreIfExists = false) } test(s"$version: getTable") { http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index b91248b..37c0179 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -149,7 +149,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { val (actualScannedColumns, actualPartValues) = plan.collect { case p @ HiveTableScan(columns, relation, _) => val columnNames = columns.map(_.name) - val partValues = if (relation.table.isPartitioned) { + val partValues = if (relation.table.partitionColumns.nonEmpty) { p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues) } else { Seq.empty --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
