Repository: spark Updated Branches: refs/heads/master 6c29b3de7 -> 2fb12b0a3
[SPARK-17903][SQL] MetastoreRelation should talk to external catalog instead of hive client ## What changes were proposed in this pull request? `HiveExternalCatalog` should be the only interface to talk to the hive metastore. In `MetastoreRelation` we can just use `ExternalCatalog` instead of `HiveClient` to interact with hive metastore, and add missing API in `ExternalCatalog`. ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes #15460 from cloud-fan/relation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2fb12b0a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2fb12b0a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2fb12b0a Branch: refs/heads/master Commit: 2fb12b0a33deeeadfac451095f64dea6c967caac Parents: 6c29b3d Author: Wenchen Fan <[email protected]> Authored: Fri Oct 14 15:53:50 2016 +0800 Committer: Wenchen Fan <[email protected]> Committed: Fri Oct 14 15:53:50 2016 +0800 ---------------------------------------------------------------------- .../sql/catalyst/catalog/ExternalCatalog.scala | 13 +++++++++++++ .../sql/catalyst/catalog/InMemoryCatalog.scala | 8 ++++++++ .../spark/sql/hive/HiveExternalCatalog.scala | 8 ++++++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 7 +++---- .../spark/sql/hive/MetastoreRelation.scala | 19 ++++++++++++------- .../org/apache/spark/sql/hive/TableReader.scala | 3 +-- .../spark/sql/hive/client/HiveClient.scala | 15 +++------------ .../spark/sql/hive/client/HiveClientImpl.scala | 10 ++++++---- .../sql/hive/HiveExternalCatalogSuite.scala | 9 +++++++++ .../spark/sql/hive/MetastoreRelationSuite.scala | 2 +- .../spark/sql/hive/client/VersionsSuite.scala | 4 ++-- 11 files changed, 66 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index dd93b46..348d3d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.catalog import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException} +import org.apache.spark.sql.catalyst.expressions.Expression /** @@ -196,6 +197,18 @@ abstract class ExternalCatalog { table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] + /** + * List the metadata of selected partitions according to the given partition predicates. + * + * @param db database name + * @param table table name + * @param predicates partition predicated + */ + def listPartitionsByFilter( + db: String, + table: String, + predicates: Seq[Expression]): Seq[CatalogTablePartition] + // -------------------------------------------------------------------------- // Functions // -------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 3e31127..49280f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.util.StringUtils /** @@ -477,6 +478,13 @@ class InMemoryCatalog( catalog(db).tables(table).partitions.values.toSeq } + override def listPartitionsByFilter( + db: String, + table: String, + predicates: Seq[Expression]): Seq[CatalogTablePartition] = { + throw new UnsupportedOperationException("listPartitionsByFilter is not implemented.") + } + // -------------------------------------------------------------------------- // Functions // -------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 237b829..b5d93c3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics} import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils} import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap @@ -646,6 +647,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.getPartitions(db, table, partialSpec) } + override def listPartitionsByFilter( + db: String, + table: String, + predicates: Seq[Expression]): Seq[CatalogTablePartition] = { + client.getPartitionsByFilter(db, table, predicates) + } + // -------------------------------------------------------------------------- // Functions // -------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 8410a2e..c44f0ad 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -44,8 +44,6 @@ import org.apache.spark.sql.types._ */ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging { private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState] - private val client = - sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client /** A fully qualified identifier for a table (i.e., database.tableName) */ case class QualifiedTableName(database: String, name: String) @@ -104,7 +102,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = { // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName) val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) - new Path(new Path(client.getDatabase(dbName).locationUri), tblName).toString + val dbLocation = sparkSession.sharedState.externalCatalog.getDatabase(dbName).locationUri + new Path(new Path(dbLocation), tblName).toString } def lookupRelation( @@ -129,7 +128,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } else { val qualifiedTable = MetastoreRelation( - qualifiedTableName.database, qualifiedTableName.name)(table, client, sparkSession) + qualifiedTableName.database, qualifiedTableName.name)(table, sparkSession) alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable) } } http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 33f0ecf..da809cf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -43,7 +43,6 @@ private[hive] case class MetastoreRelation( databaseName: String, tableName: String) (val catalogTable: CatalogTable, - @transient private val client: HiveClient, @transient private val sparkSession: SparkSession) extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation { @@ -59,7 +58,7 @@ private[hive] case class MetastoreRelation( Objects.hashCode(databaseName, tableName, output) } - override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: client :: sparkSession :: Nil + override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil private def toHiveColumn(c: StructField): FieldSchema = { new FieldSchema(c.name, c.dataType.catalogString, c.getComment.orNull) @@ -146,11 +145,18 @@ private[hive] case class MetastoreRelation( // When metastore partition pruning is turned off, we cache the list of all partitions to // mimic the behavior of Spark < 1.5 - private lazy val allPartitions: Seq[CatalogTablePartition] = client.getPartitions(catalogTable) + private lazy val allPartitions: Seq[CatalogTablePartition] = { + sparkSession.sharedState.externalCatalog.listPartitions( + catalogTable.database, + catalogTable.identifier.table) + } def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = { val rawPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) { - client.getPartitionsByFilter(catalogTable, predicates) + sparkSession.sharedState.externalCatalog.listPartitionsByFilter( + catalogTable.database, + catalogTable.identifier.table, + predicates) } else { allPartitions } @@ -234,8 +240,7 @@ private[hive] case class MetastoreRelation( val columnOrdinals = AttributeMap(attributes.zipWithIndex) override def inputFiles: Array[String] = { - val partLocations = client - .getPartitionsByFilter(catalogTable, Nil) + val partLocations = allPartitions .flatMap(_.storage.locationUri) .toArray if (partLocations.nonEmpty) { @@ -248,6 +253,6 @@ private[hive] case class MetastoreRelation( } override def newInstance(): MetastoreRelation = { - MetastoreRelation(databaseName, tableName)(catalogTable, client, sparkSession) + MetastoreRelation(databaseName, tableName)(catalogTable, sparkSession) } } http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 2a54163..aaf30f4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -149,8 +149,7 @@ class HadoopTableReader( * subdirectory of each partition being read. If None, then all files are accepted. */ def makeRDDForPartitionedTable( - partitionToDeserializer: Map[HivePartition, - Class[_ <: Deserializer]], + partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]], filterOpt: Option[PathFilter]): RDD[InternalRow] = { // SPARK-5068:get FileStatus and do the filtering locally when the path is not exists http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 984d23b..9ee3d62 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -172,24 +172,15 @@ private[hive] trait HiveClient { * Returns the partitions for the given table that match the supplied partition spec. * If no partition spec is specified, all partitions are returned. */ - final def getPartitions( + def getPartitions( db: String, table: String, - partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = { - getPartitions(getTable(db, table), partialSpec) - } - - /** - * Returns the partitions for the given table that match the supplied partition spec. - * If no partition spec is specified, all partitions are returned. - */ - def getPartitions( - table: CatalogTable, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] /** Returns partitions filtered by predicates for the given table. */ def getPartitionsByFilter( - table: CatalogTable, + db: String, + table: String, predicates: Seq[Expression]): Seq[CatalogTablePartition] /** Loads a static partition into an existing table. */ http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index dd33d75..5c8f7ff 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -525,9 +525,10 @@ private[hive] class HiveClientImpl( * If no partition spec is specified, all partitions are returned. */ override def getPartitions( - table: CatalogTable, + db: String, + table: String, spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(table) + val hiveTable = toHiveTable(getTable(db, table)) spec match { case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition) case Some(s) => client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition) @@ -535,9 +536,10 @@ private[hive] class HiveClientImpl( } override def getPartitionsByFilter( - table: CatalogTable, + db: String, + table: String, predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(table) + val hiveTable = toHiveTable(getTable(db, table)) shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition) } http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 26c2549..efa0beb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.dsl.expressions._ /** * Test suite for the [[HiveExternalCatalog]]. @@ -43,4 +44,12 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { externalCatalog.client.reset() } + import utils._ + + test("list partitions by filter") { + val catalog = newBasicCatalog() + val selectedPartitions = catalog.listPartitionsByFilter("db2", "tbl2", Seq('a.int === 1)) + assert(selectedPartitions.length == 1) + assert(selectedPartitions.head.spec == part1.spec) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala index 2f3055d..c28e41a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala @@ -29,7 +29,7 @@ class MetastoreRelationSuite extends SparkFunSuite { tableType = CatalogTableType.VIEW, storage = CatalogStorageFormat.empty, schema = StructType(StructField("a", IntegerType, true) :: Nil)) - val relation = MetastoreRelation("db", "test")(table, null, null) + val relation = MetastoreRelation("db", "test")(table, null) // No exception should be thrown relation.makeCopy(Array("db", "test")) http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 9a10957..c158bf1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -295,12 +295,12 @@ class VersionsSuite extends SparkFunSuite with Logging { } test(s"$version: getPartitions(catalogTable)") { - assert(2 == client.getPartitions(client.getTable("default", "src_part")).size) + assert(2 == client.getPartitions("default", "src_part").size) } test(s"$version: getPartitionsByFilter") { // Only one partition [1, 1] for key2 == 1 - val result = client.getPartitionsByFilter(client.getTable("default", "src_part"), + val result = client.getPartitionsByFilter("default", "src_part", Seq(EqualTo(AttributeReference("key2", IntegerType)(), Literal(1)))) // Hive 0.12 doesn't support getPartitionsByFilter, it ignores the filter condition. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
