Repository: spark
Updated Branches:
  refs/heads/branch-2.0 30e6d460d -> f124d35e2


[SPARK-18237][SPARK-18703][SPARK-18675][SQL][BACKPORT-2.0] CTAS for hive serde 
table should work for all hive versions AND Drop Staging Directories and Data 
Files

### What changes were proposed in this pull request?

This PR is to backport https://github.com/apache/spark/pull/15744, 
https://github.com/apache/spark/pull/16104 and 
https://github.com/apache/spark/pull/16134.

----------
[[SPARK-18237][HIVE] hive.exec.stagingdir have no effect
](https://github.com/apache/spark/pull/15744)

hive.exec.stagingdir have no effect in spark2.0.1,
Hive confs in hive-site.xml will be loaded in hadoopConf, so we should use 
hadoopConf in InsertIntoHiveTable instead of SessionState.conf

----------
[[SPARK-18675][SQL] CTAS for hive serde table should work for all hive 
versions](https://github.com/apache/spark/pull/16104)

Before hive 1.1, when inserting into a table, hive will create the staging 
directory under a common scratch directory. After the writing is finished, hive 
will simply empty the table directory and move the staging directory to it.

After hive 1.1, hive will create the staging directory under the table 
directory, and when moving staging directory to table directory, hive will 
still empty the table directory, but will exclude the staging directory there.

In `InsertIntoHiveTable`, we simply copy the code from hive 1.2, which means we 
will always create the staging directory under the table directory, no matter 
what the hive version is. This causes problems if the hive version is prior to 
1.1, because the staging directory will be removed by hive when hive is trying 
to empty the table directory.

This PR copies the code from hive 0.13, so that we have 2 branches to create 
staging directory. If hive version is prior to 1.1, we'll go to the old style 
branch(i.e. create the staging directory under a common scratch directory), 
else, go to the new style branch(i.e. create the staging directory under the 
table directory)

----------
[[SPARK-18703] [SQL] Drop Staging Directories and Data Files After each 
Insertion/CTAS of Hive serde Tables](https://github.com/apache/spark/pull/16134)

Below are the files/directories generated for three inserts againsts a Hive 
table:
```
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/._SUCCESS.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/_SUCCESS
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/part-00000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/._SUCCESS.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/_SUCCESS
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/part-00000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/._SUCCESS.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/_SUCCESS
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/part-00000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/part-00000
```

The first 18 files are temporary. We do not drop it until the end of JVM 
termination. If JVM does not appropriately terminate, these temporary 
files/directories will not be dropped.

Only the last two files are needed, as shown below.
```
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/part-00000
```
The temporary files/directories could accumulate a lot when we issue many 
inserts, since each insert generats at least six files. This could eat a lot of 
spaces and slow down the JVM termination. When the JVM does not terminates 
approprately, the files might not be dropped.

This PR is to drop the created staging files and temporary data files after 
each insert/CTAS.

### How was this patch tested?
Added test cases.

Author: gatorsmile <[email protected]>

Closes #16399 from gatorsmile/backport18703&18675ToSpark2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f124d35e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f124d35e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f124d35e

Branch: refs/heads/branch-2.0
Commit: f124d35e267c9b51765be79530b0008b779205a1
Parents: 30e6d46
Author: gatorsmile <[email protected]>
Authored: Mon Dec 26 15:00:22 2016 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Mon Dec 26 15:00:22 2016 +0800

----------------------------------------------------------------------
 .../hive/execution/InsertIntoHiveTable.scala    | 85 +++++++++++++++++---
 .../spark/sql/hive/client/VersionsSuite.scala   | 43 +++++++++-
 2 files changed, 113 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f124d35e/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 6a091aa..368d8f5 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -24,8 +24,8 @@ import java.util
 import java.util.{Date, Random}
 
 import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.common.FileUtils
 import org.apache.hadoop.hive.ql.exec.TaskRunner
@@ -55,7 +55,10 @@ case class InsertIntoHiveTable(
 
   def output: Seq[Attribute] = Seq.empty
 
-  val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", 
".hive-staging")
+  val hadoopConf = sessionState.newHadoopConf()
+  var createdTempDir: Option[Path] = None
+  val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
+  val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
 
   private def executionId: String = {
     val rand: Random = new Random
@@ -64,7 +67,7 @@ case class InsertIntoHiveTable(
     return executionId
   }
 
-  private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path 
= {
+  private def getStagingDir(inputPath: Path): Path = {
     val inputPathUri: URI = inputPath.toUri
     val inputPathName: String = inputPathUri.getPath
     val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
@@ -82,32 +85,80 @@ case class InsertIntoHiveTable(
       if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
         throw new IllegalStateException("Cannot create staging directory  '" + 
dir.toString + "'")
       }
+      createdTempDir = Some(dir)
       fs.deleteOnExit(dir)
     }
     catch {
       case e: IOException =>
         throw new RuntimeException(
           "Cannot create staging directory '" + dir.toString + "': " + 
e.getMessage, e)
-
     }
     return dir
   }
 
-  private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): 
Path = {
-    getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, 
extURI.getPath), hadoopConf)
+  private def getExternalScratchDir(extURI: URI): Path = {
+    getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, 
extURI.getPath))
+  }
+
+  def getExternalTmpPath(path: Path): Path = {
+    import org.apache.spark.sql.hive.client.hive._
+
+    val hiveVersion = client.version
+    // Before Hive 1.1, when inserting into a table, Hive will create the 
staging directory under
+    // a common scratch directory. After the writing is finished, Hive will 
simply empty the table
+    // directory and move the staging directory to it.
+    // After Hive 1.1, Hive will create the staging directory under the table 
directory, and when
+    // moving staging directory to table directory, Hive will still empty the 
table directory, but
+    // will exclude the staging directory there.
+    // We have to follow the Hive behavior here, to avoid troubles. For 
example, if we create
+    // staging directory under the table director for Hive prior to 1.1, the 
staging directory will
+    // be removed by Hive when Hive is trying to empty the table directory.
+    if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || 
hiveVersion == v1_0) {
+      oldVersionExternalTempPath(path)
+    } else if (hiveVersion == v1_1 || hiveVersion == v1_2) {
+      newVersionExternalTempPath(path)
+    } else {
+      throw new IllegalStateException("Unsupported hive version: " + 
hiveVersion.fullVersion)
+    }
   }
 
-  def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
+  // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
+  def oldVersionExternalTempPath(path: Path): Path = {
+    val extURI: URI = path.toUri
+    val scratchPath = new Path(scratchDir, executionId)
+    var dirPath = new Path(
+      extURI.getScheme,
+      extURI.getAuthority,
+      scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
+
+    try {
+      val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
+      dirPath = new Path(fs.makeQualified(dirPath).toString())
+
+      if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
+        throw new IllegalStateException("Cannot create staging directory: " + 
dirPath.toString)
+      }
+      createdTempDir = Some(dirPath)
+      fs.deleteOnExit(dirPath)
+    } catch {
+      case e: IOException =>
+        throw new RuntimeException("Cannot create staging directory: " + 
dirPath.toString, e)
+    }
+    dirPath
+  }
+
+  // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
+  def newVersionExternalTempPath(path: Path): Path = {
     val extURI: URI = path.toUri
     if (extURI.getScheme == "viewfs") {
-      getExtTmpPathRelTo(path.getParent, hadoopConf)
+      getExtTmpPathRelTo(path.getParent)
     } else {
-      new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
+      new Path(getExternalScratchDir(extURI), "-ext-10000")
     }
   }
 
-  def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
-    new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
+  def getExtTmpPathRelTo(path: Path): Path = {
+    new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000
   }
 
   private def saveAsHiveFile(
@@ -144,8 +195,7 @@ case class InsertIntoHiveTable(
     // instances within the closure, since Serializer is not serializable 
while TableDesc is.
     val tableDesc = table.tableDesc
     val tableLocation = table.hiveQlTable.getDataLocation
-    val hadoopConf = sessionState.newHadoopConf()
-    val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
+    val tmpLocation = getExternalTmpPath(tableLocation)
     val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
     val isCompressed = hadoopConf.get("hive.exec.compress.output", 
"false").toBoolean
 
@@ -293,6 +343,15 @@ case class InsertIntoHiveTable(
         holdDDLTime)
     }
 
+    // Attempt to delete the staging directory and the inclusive files. If 
failed, the files are
+    // expected to be dropped at the normal termination of VM since 
deleteOnExit is used.
+    try {
+      createdTempDir.foreach { path => 
path.getFileSystem(hadoopConf).delete(path, true) }
+    } catch {
+      case NonFatal(e) =>
+        logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
+    }
+
     // Invalidate the cache.
     sqlContext.sharedState.cacheManager.invalidateCache(table)
     sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)

http://git-wip-us.apache.org/repos/asf/spark/blob/f124d35e/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 5b209ac..4922acf 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -28,13 +28,15 @@ import org.apache.hadoop.util.VersionInfo
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Literal, NamedExpression}
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types.IntegerType
 import org.apache.spark.tags.ExtendedHiveTest
 import org.apache.spark.util.{MutableURLClassLoader, Utils}
@@ -46,7 +48,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
  * is not fully tested.
  */
 @ExtendedHiveTest
-class VersionsSuite extends SparkFunSuite with Logging {
+class VersionsSuite extends SparkFunSuite with SQLTestUtils with 
TestHiveSingleton with Logging {
 
   private val sparkConf = new SparkConf()
 
@@ -531,5 +533,42 @@ class VersionsSuite extends SparkFunSuite with Logging {
       client.reset()
       assert(client.listTables("default").isEmpty)
     }
+
+    ///////////////////////////////////////////////////////////////////////////
+    // End-To-End tests
+    ///////////////////////////////////////////////////////////////////////////
+
+    test(s"$version: CREATE TABLE AS SELECT") {
+      withTable("tbl") {
+        spark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
+        assert(spark.table("tbl").collect().toSeq == Seq(Row(1)))
+      }
+    }
+
+    test(s"$version: Delete the temporary staging directory and files after 
each insert") {
+      withTempDir { tmpDir =>
+        withTable("tab") {
+          spark.sql(
+            s"""
+               |CREATE TABLE tab(c1 string)
+               |location '${tmpDir.toURI.toString}'
+             """.stripMargin)
+
+          (1 to 3).map { i =>
+            spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'")
+          }
+          def listFiles(path: File): List[String] = {
+            val dir = path.listFiles()
+            val folders = dir.filter(_.isDirectory).toList
+            val filePaths = dir.map(_.getName).toList
+            folders.flatMap(listFiles) ++: filePaths
+          }
+          val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
+          assert(listFiles(tmpDir).sorted == expectedFiles)
+        }
+      }
+    }
+
+    // TODO: add more tests.
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to