Repository: spark
Updated Branches:
  refs/heads/branch-1.1 6ffdcc61f -> 7c6afdac8


[SPARK-2783][SQL] Basic support for analyze in HiveContext

JIRA: https://issues.apache.org/jira/browse/SPARK-2783

Author: Yin Huai <[email protected]>

Closes #1741 from yhuai/analyzeTable and squashes the following commits:

7bb5f02 [Yin Huai] Use sql instead of hql.
4d09325 [Yin Huai] Merge remote-tracking branch 'upstream/master' into 
analyzeTable
e3ebcd4 [Yin Huai] Renaming.
c170f4e [Yin Huai] Do not use getContentSummary.
62393b6 [Yin Huai] Merge remote-tracking branch 'upstream/master' into 
analyzeTable
db233a6 [Yin Huai] Trying to debug jenkins...
fee84f0 [Yin Huai] Merge remote-tracking branch 'upstream/master' into 
analyzeTable
f0501f3 [Yin Huai] Fix compilation error.
24ad391 [Yin Huai] Merge remote-tracking branch 'upstream/master' into 
analyzeTable
8918140 [Yin Huai] Wording.
23df227 [Yin Huai] Add a simple analyze method to get the size of a table and 
update the "totalSize" property of this table in the Hive metastore.

(cherry picked from commit e139e2be60ef23281327744e1b3e74904dfdf63f)
Signed-off-by: Michael Armbrust <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c6afdac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c6afdac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c6afdac

Branch: refs/heads/branch-1.1
Commit: 7c6afdac867d52447221438ed7508123c07d17f8
Parents: 6ffdcc6
Author: Yin Huai <[email protected]>
Authored: Sun Aug 3 14:54:41 2014 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Sun Aug 3 14:55:45 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/hive/HiveContext.scala | 79 ++++++++++++++++++++
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  5 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala | 54 +++++++++++++
 3 files changed, 136 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7c6afdac/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index acad681..d8e7a59 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -25,10 +25,14 @@ import scala.collection.JavaConversions._
 import scala.language.implicitConversions
 import scala.reflect.runtime.universe.{TypeTag, typeTag}
 
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.ql.Driver
+import org.apache.hadoop.hive.ql.metadata.Table
 import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hadoop.hive.ql.stats.StatsSetupConst
 import org.apache.hadoop.hive.serde2.io.TimestampWritable
 
 import org.apache.spark.SparkContext
@@ -107,6 +111,81 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) 
{
     catalog.createTable("default", tableName, 
ScalaReflection.attributesFor[A], allowExisting)
   }
 
+  /**
+   * Analyzes the given table in the current database to generate statistics, 
which will be
+   * used in query optimizations.
+   *
+   * Right now, it only supports Hive tables and it only updates the size of a 
Hive table
+   * in the Hive metastore.
+   */
+  def analyze(tableName: String) {
+    val relation = catalog.lookupRelation(None, tableName) match {
+      case LowerCaseSchema(r) => r
+      case o => o
+    }
+
+    relation match {
+      case relation: MetastoreRelation => {
+        // This method is mainly based on
+        // 
org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
+        // in Hive 0.13 (except that we do not use fs.getContentSummary).
+        // TODO: Generalize statistics collection.
+        // TODO: Why fs.getContentSummary returns wrong size on Jenkins?
+        // Can we use fs.getContentSummary in future?
+        // Seems fs.getContentSummary returns wrong table size on Jenkins. So 
we use
+        // countFileSize to count the table size.
+        def calculateTableSize(fs: FileSystem, path: Path): Long = {
+          val fileStatus = fs.getFileStatus(path)
+          val size = if (fileStatus.isDir) {
+            fs.listStatus(path).map(status => calculateTableSize(fs, 
status.getPath)).sum
+          } else {
+            fileStatus.getLen
+          }
+
+          size
+        }
+
+        def getFileSizeForTable(conf: HiveConf, table: Table): Long = {
+          val path = table.getPath()
+          var size: Long = 0L
+          try {
+            val fs = path.getFileSystem(conf)
+            size = calculateTableSize(fs, path)
+          } catch {
+            case e: Exception =>
+              logWarning(
+                s"Failed to get the size of table ${table.getTableName} in the 
" +
+                s"database ${table.getDbName} because of ${e.toString}", e)
+              size = 0L
+          }
+
+          size
+        }
+
+        val tableParameters = relation.hiveQlTable.getParameters
+        val oldTotalSize =
+          
Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE)).map(_.toLong).getOrElse(0L)
+        val newTotalSize = getFileSizeForTable(hiveconf, relation.hiveQlTable)
+        // Update the Hive metastore if the total size of the table is 
different than the size
+        // recorded in the Hive metastore.
+        // This logic is based on 
org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
+        if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
+          tableParameters.put(StatsSetupConst.TOTAL_SIZE, 
newTotalSize.toString)
+          val hiveTTable = relation.hiveQlTable.getTTable
+          hiveTTable.setParameters(tableParameters)
+          val tableFullName =
+            relation.hiveQlTable.getDbName() + "." + 
relation.hiveQlTable.getTableName()
+
+          catalog.client.alterTable(tableFullName, new Table(hiveTTable))
+        }
+      }
+      case otherRelation =>
+        throw new NotImplementedError(
+          s"Analyze has only implemented for Hive tables, " +
+            s"but ${tableName} is a ${otherRelation.nodeName}")
+    }
+  }
+
   // Circular buffer to hold what hive prints to STDOUT and ERR.  Only printed 
when failures occur.
   @transient
   protected lazy val outputBuffer =  new java.io.OutputStream {

http://git-wip-us.apache.org/repos/asf/spark/blob/7c6afdac/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index df36044..301cf51 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.{FieldSchema, 
StorageDescriptor, Ser
 import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => 
TPartition}
 import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
 import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.ql.stats.StatsSetupConst
 import org.apache.hadoop.hive.serde2.Deserializer
 
 import org.apache.spark.annotation.DeveloperApi
@@ -278,9 +279,9 @@ private[hive] case class MetastoreRelation
       // relatively cheap if parameters for the table are populated into the 
metastore.  An
       // alternative would be going through Hadoop's FileSystem API, which can 
be expensive if a lot
       // of RPCs are involved.  Besides `totalSize`, there are also 
`numFiles`, `numRows`,
-      // `rawDataSize` keys that we can look at in the future.
+      // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at 
in the future.
       BigInt(
-        Option(hiveQlTable.getParameters.get("totalSize"))
+        Option(hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE))
           .map(_.toLong)
           .getOrElse(sqlContext.defaultSizeInBytes))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/7c6afdac/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index d8c77d6..bf5931b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -26,6 +26,60 @@ import org.apache.spark.sql.hive.test.TestHive._
 
 class StatisticsSuite extends QueryTest {
 
+  test("analyze MetastoreRelations") {
+    def queryTotalSize(tableName: String): BigInt =
+      catalog.lookupRelation(None, tableName).statistics.sizeInBytes
+
+    // Non-partitioned table
+    sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
+    sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect()
+    sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect()
+
+    assert(queryTotalSize("analyzeTable") === defaultSizeInBytes)
+
+    analyze("analyzeTable")
+
+    assert(queryTotalSize("analyzeTable") === BigInt(11624))
+
+    sql("DROP TABLE analyzeTable").collect()
+
+    // Partitioned table
+    sql(
+      """
+        |CREATE TABLE analyzeTable_part (key STRING, value STRING) PARTITIONED 
BY (ds STRING)
+      """.stripMargin).collect()
+    sql(
+      """
+        |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-01')
+        |SELECT * FROM src
+      """.stripMargin).collect()
+    sql(
+      """
+        |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-02')
+        |SELECT * FROM src
+      """.stripMargin).collect()
+    sql(
+      """
+        |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-03')
+        |SELECT * FROM src
+      """.stripMargin).collect()
+
+    assert(queryTotalSize("analyzeTable_part") === defaultSizeInBytes)
+
+    analyze("analyzeTable_part")
+
+    assert(queryTotalSize("analyzeTable_part") === BigInt(17436))
+
+    sql("DROP TABLE analyzeTable_part").collect()
+
+    // Try to analyze a temp table
+    sql("""SELECT * FROM src""").registerTempTable("tempTable")
+    intercept[NotImplementedError] {
+      analyze("tempTable")
+    }
+    catalog.unregisterTable(None, "tempTable")
+  }
+
   test("estimates the size of a test MetastoreRelation") {
     val rdd = sql("""SELECT * FROM src""")
     val sizes = rdd.queryExecution.analyzed.collect { case mr: 
MetastoreRelation =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to