Repository: spark
Updated Branches:
  refs/heads/master 407c3cedf -> 84a339990


[SPARK-18028][SQL] simplify TableFileCatalog

## What changes were proposed in this pull request?

Simplify/cleanup TableFileCatalog:

1. pass a `CatalogTable` instead of `databaseName` and `tableName` into 
`TableFileCatalog`, so that we don't need to fetch table metadata from 
metastore again
2. In `TableFileCatalog.filterPartitions0`, DO NOT set 
`PartitioningAwareFileCatalog.BASE_PATH_PARAM`. According to the 
[classdoc](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L189-L209),
 the default value of `basePath` already satisfies our need. What's more, if we 
set this parameter, we may break the case 2 which is metioned in the classdoc.
3. add `equals` and `hashCode` to `TableFileCatalog`
4. add `SessionCatalog.listPartitionsByFilter` which handles case sensitivity.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <[email protected]>

Closes #15568 from cloud-fan/table-file-catalog.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84a33999
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84a33999
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84a33999

Branch: refs/heads/master
Commit: 84a33999082af88ea6365cdb5c7232ed0933b1c6
Parents: 407c3ce
Author: Wenchen Fan <[email protected]>
Authored: Tue Oct 25 08:42:21 2016 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Tue Oct 25 08:42:21 2016 +0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   | 14 +++++
 .../datasources/TableFileCatalog.scala          | 54 ++++++++++----------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  4 +-
 .../spark/sql/hive/CachedTableSuite.scala       | 41 ++++++++++++++-
 .../PruneFileSourcePartitionsSuite.scala        |  7 +--
 5 files changed, 84 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/84a33999/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 9711131..3d6eec8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -756,6 +756,20 @@ class SessionCatalog(
   }
 
   /**
+   * List the metadata of partitions that belong to the specified table, 
assuming it exists, that
+   * satisfy the given partition-pruning predicate expressions.
+   */
+  def listPartitionsByFilter(
+      tableName: TableIdentifier,
+      predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
+    val db = 
formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
+    val table = formatTableName(tableName.table)
+    requireDbExists(db)
+    requireTableExists(TableIdentifier(table, Option(db)))
+    externalCatalog.listPartitionsByFilter(db, table, predicates)
+  }
+
+  /**
    * Verify if the input partition spec exactly matches the existing defined 
partition spec
    * The columns must be the same but the orders could be different.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/84a33999/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
index 31a01bc..667379b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
@@ -20,36 +20,30 @@ package org.apache.spark.sql.execution.datasources
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.StructType
 
 
 /**
  * A [[FileCatalog]] for a metastore catalog table.
  *
  * @param sparkSession a [[SparkSession]]
- * @param db the table's database name
- * @param table the table's (unqualified) name
- * @param partitionSchema the schema of a partitioned table's partition columns
+ * @param table the metadata of the table
  * @param sizeInBytes the table's data size in bytes
- * @param fileStatusCache optional cache implementation to use for file listing
  */
 class TableFileCatalog(
     sparkSession: SparkSession,
-    db: String,
-    table: String,
-    partitionSchema: Option[StructType],
+    val table: CatalogTable,
     override val sizeInBytes: Long) extends FileCatalog {
 
   protected val hadoopConf = sparkSession.sessionState.newHadoopConf
 
   private val fileStatusCache = FileStatusCache.newCache(sparkSession)
 
-  private val externalCatalog = sparkSession.sharedState.externalCatalog
+  assert(table.identifier.database.isDefined,
+    "The table identifier must be qualified in TableFileCatalog")
 
-  private val catalogTable = externalCatalog.getTable(db, table)
-
-  private val baseLocation = catalogTable.storage.locationUri
+  private val baseLocation = table.storage.locationUri
 
   override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
 
@@ -66,24 +60,32 @@ class TableFileCatalog(
    * @param filters partition-pruning filters
    */
   def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
-    val parameters = baseLocation
-      .map(loc => Map(PartitioningAwareFileCatalog.BASE_PATH_PARAM -> loc))
-      .getOrElse(Map.empty)
-    partitionSchema match {
-      case Some(schema) =>
-        val selectedPartitions = externalCatalog.listPartitionsByFilter(db, 
table, filters)
-        val partitions = selectedPartitions.map { p =>
-          PartitionPath(p.toRow(schema), p.storage.locationUri.get)
-        }
-        val partitionSpec = PartitionSpec(schema, partitions)
-        new PrunedTableFileCatalog(
-          sparkSession, new Path(baseLocation.get), fileStatusCache, 
partitionSpec)
-      case None =>
-        new ListingFileCatalog(sparkSession, rootPaths, parameters, None, 
fileStatusCache)
+    if (table.partitionColumnNames.nonEmpty) {
+      val selectedPartitions = 
sparkSession.sessionState.catalog.listPartitionsByFilter(
+        table.identifier, filters)
+      val partitionSchema = table.partitionSchema
+      val partitions = selectedPartitions.map { p =>
+        PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
+      }
+      val partitionSpec = PartitionSpec(partitionSchema, partitions)
+      new PrunedTableFileCatalog(
+        sparkSession, new Path(baseLocation.get), fileStatusCache, 
partitionSpec)
+    } else {
+      new ListingFileCatalog(sparkSession, rootPaths, 
table.storage.properties, None)
     }
   }
 
   override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles
+
+  // `TableFileCatalog` may be a member of `HadoopFsRelation`, 
`HadoopFsRelation` may be a member
+  // of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. 
So we need to
+  // implement `equals` and `hashCode` here, to make it work with cache lookup.
+  override def equals(o: Any): Boolean = o match {
+    case other: TableFileCatalog => this.table.identifier == 
other.table.identifier
+    case _ => false
+  }
+
+  override def hashCode(): Int = table.identifier.hashCode()
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/84a33999/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 4408933..6c1585d 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -226,12 +226,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
         Some(partitionSchema))
 
       val logicalRelation = cached.getOrElse {
-        val db = metastoreRelation.databaseName
-        val table = metastoreRelation.tableName
         val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
         val fileCatalog = {
           val catalog = new TableFileCatalog(
-            sparkSession, db, table, Some(partitionSchema), sizeInBytes)
+            sparkSession, metastoreRelation.catalogTable, sizeInBytes)
           if (lazyPruningEnabled) {
             catalog
           } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/84a33999/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 7d4ef6f..ecdf4f1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -19,12 +19,15 @@ package org.apache.spark.sql.hive
 
 import java.io.File
 
-import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
+import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation, TableFileCatalog}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.storage.RDDBlockId
 import org.apache.spark.util.Utils
 
@@ -317,4 +320,40 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleto
 
     sql("DROP TABLE cachedTable")
   }
+
+  test("cache a table using TableFileCatalog") {
+    withTable("test") {
+      sql("CREATE TABLE test(i int) PARTITIONED BY (p int) STORED AS parquet")
+      val tableMeta = spark.sharedState.externalCatalog.getTable("default", 
"test")
+      val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0)
+
+      val dataSchema = StructType(tableMeta.schema.filterNot { f =>
+        tableMeta.partitionColumnNames.contains(f.name)
+      })
+      val relation = HadoopFsRelation(
+        location = tableFileCatalog,
+        partitionSchema = tableMeta.partitionSchema,
+        dataSchema = dataSchema,
+        bucketSpec = None,
+        fileFormat = new ParquetFileFormat(),
+        options = Map.empty)(sparkSession = spark)
+
+      val plan = LogicalRelation(relation, catalogTable = Some(tableMeta))
+      spark.sharedState.cacheManager.cacheQuery(Dataset.ofRows(spark, plan))
+
+      assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined)
+
+      val sameCatalog = new TableFileCatalog(spark, tableMeta, 0)
+      val sameRelation = HadoopFsRelation(
+        location = sameCatalog,
+        partitionSchema = tableMeta.partitionSchema,
+        dataSchema = dataSchema,
+        bucketSpec = None,
+        fileFormat = new ParquetFileFormat(),
+        options = Map.empty)(sparkSession = spark)
+      val samePlan = LogicalRelation(sameRelation, catalogTable = 
Some(tableMeta))
+
+      
assert(spark.sharedState.cacheManager.lookupCachedData(samePlan).isDefined)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/84a33999/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
index 346ea0c..59639aa 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -45,12 +45,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with 
SQLTestUtils with Te
             |LOCATION '${dir.getAbsolutePath}'""".stripMargin)
 
         val tableMeta = spark.sharedState.externalCatalog.getTable("default", 
"test")
-        val tableFileCatalog = new TableFileCatalog(
-          spark,
-          tableMeta.database,
-          tableMeta.identifier.table,
-          Some(tableMeta.partitionSchema),
-          0)
+        val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0)
 
         val dataSchema = StructType(tableMeta.schema.filterNot { f =>
           tableMeta.partitionColumnNames.contains(f.name)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to