Repository: spark
Updated Branches:
  refs/heads/master 29081b587 -> 92da22878


[SPARK-16905] SQL DDL: MSCK REPAIR TABLE

## What changes were proposed in this pull request?

MSCK REPAIR TABLE could be used to recover the partitions in external catalog 
based on partitions in file system.

Another syntax is: ALTER TABLE table RECOVER PARTITIONS

The implementation in this PR will only list partitions (not the files with a 
partition) in driver (in parallel if needed).

## How was this patch tested?

Added unit tests for it and Hive compatibility test suite.

Author: Davies Liu <[email protected]>

Closes #14500 from davies/repair_table.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92da2287
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92da2287
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92da2287

Branch: refs/heads/master
Commit: 92da22878bac07545cd946911dcb39a6bb2ee7e8
Parents: 29081b5
Author: Davies Liu <[email protected]>
Authored: Tue Aug 9 10:04:36 2016 -0700
Committer: Davies Liu <[email protected]>
Committed: Tue Aug 9 10:04:36 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |   6 +-
 .../spark/sql/execution/SparkSqlParser.scala    |  27 +++++
 .../spark/sql/execution/command/ddl.scala       | 118 ++++++++++++++++++-
 .../spark/sql/execution/command/tables.scala    |   2 +-
 .../sql/execution/command/DDLCommandSuite.scala |   8 ++
 .../spark/sql/execution/command/DDLSuite.scala  |  49 ++++++++
 .../spark/sql/hive/HiveDDLCommandSuite.scala    |  10 +-
 7 files changed, 211 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/92da2287/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index c7d5086..d2b5c53 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -84,6 +84,7 @@ statement
     | ALTER VIEW tableIdentifier
         DROP (IF EXISTS)? partitionSpec (',' partitionSpec)*           
#dropTablePartitions
     | ALTER TABLE tableIdentifier partitionSpec? SET locationSpec      
#setTableLocation
+    | ALTER TABLE tableIdentifier RECOVER PARTITIONS                   
#recoverPartitions
     | DROP TABLE (IF EXISTS)? tableIdentifier PURGE?                   
#dropTable
     | DROP VIEW (IF EXISTS)? tableIdentifier                           
#dropTable
     | CREATE (OR REPLACE)? TEMPORARY? VIEW (IF NOT EXISTS)? tableIdentifier
@@ -121,6 +122,7 @@ statement
     | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
         tableIdentifier partitionSpec?                                 
#loadData
     | TRUNCATE TABLE tableIdentifier partitionSpec?                    
#truncateTable
+    | MSCK REPAIR TABLE tableIdentifier                                
#repairTable
     | op=(ADD | LIST) identifier .*?                                   
#manageResource
     | SET ROLE .*?                                                     
#failNativeCommand
     | SET .*?                                                          
#setConfiguration
@@ -154,7 +156,6 @@ unsupportedHiveNativeCommands
     | kw1=UNLOCK kw2=DATABASE
     | kw1=CREATE kw2=TEMPORARY kw3=MACRO
     | kw1=DROP kw2=TEMPORARY kw3=MACRO
-    | kw1=MSCK kw2=REPAIR kw3=TABLE
     | kw1=ALTER kw2=TABLE tableIdentifier kw3=NOT kw4=CLUSTERED
     | kw1=ALTER kw2=TABLE tableIdentifier kw3=CLUSTERED kw4=BY
     | kw1=ALTER kw2=TABLE tableIdentifier kw3=NOT kw4=SORTED
@@ -653,7 +654,7 @@ nonReserved
     | CASCADE | RESTRICT | BUCKETS | CLUSTERED | SORTED | PURGE | INPUTFORMAT 
| OUTPUTFORMAT
     | DBPROPERTIES | DFS | TRUNCATE | COMPUTE | LIST
     | STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER
-    | REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD 
| VALUES | COMMENT | ROLE
+    | REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | RECOVER | EXPORT | 
IMPORT | LOAD | VALUES | COMMENT | ROLE
     | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | 
LOCKS | OPTION | LOCAL | INPATH
     | ASC | DESC | LIMIT | RENAME | SETS
     | AT | NULLS | OVERWRITE | ALL | ALTER | AS | BETWEEN | BY | CREATE | 
DELETE
@@ -866,6 +867,7 @@ LOCK: 'LOCK';
 UNLOCK: 'UNLOCK';
 MSCK: 'MSCK';
 REPAIR: 'REPAIR';
+RECOVER: 'RECOVER';
 EXPORT: 'EXPORT';
 IMPORT: 'IMPORT';
 LOAD: 'LOAD';

http://git-wip-us.apache.org/repos/asf/spark/blob/92da2287/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 2a452f4..9da2b5a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -415,6 +415,20 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder 
{
   }
 
   /**
+   * Create a [[AlterTableRecoverPartitionsCommand]] command.
+   *
+   * For example:
+   * {{{
+   *   MSCK REPAIR TABLE tablename
+   * }}}
+   */
+  override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = 
withOrigin(ctx) {
+    AlterTableRecoverPartitionsCommand(
+      visitTableIdentifier(ctx.tableIdentifier),
+      "MSCK REPAIR TABLE")
+  }
+
+  /**
    * Convert a table property list into a key-value map.
    * This should be called through [[visitPropertyKeyValues]] or 
[[visitPropertyKeys]].
    */
@@ -785,6 +799,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder 
{
   }
 
   /**
+   * Create an [[AlterTableDiscoverPartitionsCommand]] command
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table RECOVER PARTITIONS;
+   * }}}
+   */
+  override def visitRecoverPartitions(
+      ctx: RecoverPartitionsContext): LogicalPlan = withOrigin(ctx) {
+    
AlterTableRecoverPartitionsCommand(visitTableIdentifier(ctx.tableIdentifier))
+  }
+
+  /**
    * Create an [[AlterTableSetLocationCommand]] command
    *
    * For example:

http://git-wip-us.apache.org/repos/asf/spark/blob/92da2287/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index f0e49e6..8fa7615 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -17,18 +17,23 @@
 
 package org.apache.spark.sql.execution.command
 
+import scala.collection.GenSeq
+import scala.collection.parallel.ForkJoinTaskSupport
+import scala.concurrent.forkjoin.ForkJoinPool
 import scala.util.control.NonFatal
 
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, 
CatalogTable}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, 
CatalogTableType, SessionCatalog}
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, 
CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes._
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
 import org.apache.spark.sql.types._
 
-
 // Note: The definition of these commands are based on the ones described in
 // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
 
@@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+    tableName: TableIdentifier,
+    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+    val catalog = spark.sessionState.catalog
+    if (!catalog.tableExists(tableName)) {
+      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
+    }
+    val table = catalog.getTableMetadata(tableName)
+    if (catalog.isTemporaryTable(tableName)) {
+      throw new AnalysisException(
+        s"Operation not allowed: $cmd on temporary tables: $tableName")
+    }
+    if (DDLUtils.isDatasourceTable(table)) {
+      throw new AnalysisException(
+        s"Operation not allowed: $cmd on datasource tables: $tableName")
+    }
+    if (table.tableType != CatalogTableType.EXTERNAL) {
+      throw new AnalysisException(
+        s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+    }
+    if (!DDLUtils.isTablePartitioned(table)) {
+      throw new AnalysisException(
+        s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+    }
+    if (table.storage.locationUri.isEmpty) {
+      throw new AnalysisException(
+        s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+    }
+
+    val root = new Path(table.storage.locationUri.get)
+    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+    // Dummy jobconf to get to the pathFilter defined in configuration
+    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() 
is slow)
+    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+    val partitionSpecsAndLocs = scanPartitions(
+      spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+      // inherit table storage format (possibly except for location)
+      CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+    }
+    spark.sessionState.catalog.createPartitions(tableName,
+      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+    Seq.empty[Row]
+  }
+
+  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new 
ForkJoinPool(8))
+
+  private def scanPartitions(
+      spark: SparkSession,
+      fs: FileSystem,
+      filter: PathFilter,
+      path: Path,
+      spec: TablePartitionSpec,
+      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
+    if (partitionNames.length == 0) {
+      return Seq(spec -> path)
+    }
+
+    val statuses = fs.listStatus(path)
+    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", 
"10").toInt
+    val statusPar: GenSeq[FileStatus] =
+      if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
+        val parArray = statuses.par
+        parArray.tasksupport = evalTaskSupport
+        parArray
+      } else {
+        statuses
+      }
+    statusPar.flatMap { st =>
+      val name = st.getPath.getName
+      if (st.isDirectory && name.contains("=")) {
+        val ps = name.split("=", 2)
+        val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
+        // TODO: Validate the value
+        val value = PartitioningUtils.unescapePathName(ps(1))
+        // comparing with case-insensitive, but preserve the case
+        if (columnName == partitionNames(0)) {
+          scanPartitions(
+            spark, fs, filter, st.getPath, spec ++ Map(columnName -> value), 
partitionNames.drop(1))
+        } else {
+          logWarning(s"expect partition column ${partitionNames(0)}, but got 
${ps(0)}, ignore it")
+          Seq()
+        }
+      } else {
+        if (name != "_SUCCESS" && name != "_temporary" && 
!name.startsWith(".")) {
+          logWarning(s"ignore ${new Path(path, name)}")
+        }
+        Seq()
+      }
+    }
+  }
+}
+
 
 /**
  * A command that sets the location of a table or a partition.

http://git-wip-us.apache.org/repos/asf/spark/blob/92da2287/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index e6fe9a7..3b10526 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -35,7 +35,7 @@ import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, 
UnaryNode}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.execution.datasources.{PartitioningUtils}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/spark/blob/92da2287/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 044fa5f..be1bccb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -540,6 +540,14 @@ class DDLCommandSuite extends PlanTest {
     comparePlans(parsed2, expected2)
   }
 
+  test("alter table: recover partitions") {
+    val sql = "ALTER TABLE table_name RECOVER PARTITIONS"
+    val parsed = parser.parsePlan(sql)
+    val expected = AlterTableRecoverPartitionsCommand(
+      TableIdentifier("table_name", None))
+    comparePlans(parsed, expected)
+  }
+
   test("alter view: add partition (not supported)") {
     assertUnsupported(
       """

http://git-wip-us.apache.org/repos/asf/spark/blob/92da2287/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index ca9b210..53376c5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -864,6 +864,55 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
     testAddPartitions(isDatasourceTable = true)
   }
 
+  test("alter table: recover partitions (sequential)") {
+    withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
+      testRecoverPartitions()
+    }
+  }
+
+  test("alter table: recover partition (parallel)") {
+    withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
+      testRecoverPartitions()
+    }
+  }
+
+  private def testRecoverPartitions() {
+    val catalog = spark.sessionState.catalog
+    // table to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE does_not_exist RECOVER PARTITIONS")
+    }
+
+    val tableIdent = TableIdentifier("tab1")
+    createTable(catalog, tableIdent)
+    val part1 = Map("a" -> "1", "b" -> "5")
+    createTablePartition(catalog, part1, tableIdent)
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
+
+    val part2 = Map("a" -> "2", "b" -> "6")
+    val root = new 
Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
+    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+    // valid
+    fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
+    fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
+    // invalid
+    fs.mkdirs(new Path(new Path(root, "a"), "b"))  // bad name
+    fs.mkdirs(new Path(new Path(root, "b=1"), "a=1"))  // wrong order
+    fs.mkdirs(new Path(root, "a=4")) // not enough columns
+    fs.createNewFile(new Path(new Path(root, "a=1"), "b=4"))  // file
+    fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS"))  // _SUCCESS
+    fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary"))  // _temporary
+    fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4"))  // start with .
+
+    try {
+      sql("ALTER TABLE tab1 RECOVER PARTITIONS")
+      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+        Set(part1, part2))
+    } finally {
+      fs.delete(root, true)
+    }
+  }
+
   test("alter table: add partition is not supported for views") {
     assertUnsupported("ALTER VIEW dbx.tab1 ADD IF NOT EXISTS PARTITION 
(b='2')")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/92da2287/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 69a6884..54e27b6 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -499,8 +500,13 @@ class HiveDDLCommandSuite extends PlanTest {
     }
   }
 
-  test("MSCK repair table (not supported)") {
-    assertUnsupported("MSCK REPAIR TABLE tab1")
+  test("MSCK REPAIR table") {
+    val sql = "MSCK REPAIR TABLE tab1"
+    val parsed = parser.parsePlan(sql)
+    val expected = AlterTableRecoverPartitionsCommand(
+      TableIdentifier("tab1", None),
+      "MSCK REPAIR TABLE")
+    comparePlans(parsed, expected)
   }
 
   test("create table like") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to