Repository: spark
Updated Branches:
  refs/heads/branch-1.4 81b35d864 -> 7edb17bf0


[SPARK-7842] [SQL] Makes task committing/aborting in InsertIntoHadoopFsRelation 
more robust

When committing/aborting a write task issued in `InsertIntoHadoopFsRelation`, 
if an exception is thrown from `OutputWriter.close()`, the committing/aborting 
process will be interrupted, and leaves messy stuff behind (e.g., the 
`_temporary` directory created by `FileOutputCommitter`).

This PR makes these two process more robust by catching potential exceptions 
and falling back to normal task committment/abort.

Author: Cheng Lian <[email protected]>

Closes #6378 from liancheng/spark-7838 and squashes the following commits:

f18253a [Cheng Lian] Makes task committing/aborting in 
InsertIntoHadoopFsRelation more robust

(cherry picked from commit 8af1bf10b70b9b67f18f618174e84365d69caa48)
Signed-off-by: Cheng Lian <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7edb17bf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7edb17bf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7edb17bf

Branch: refs/heads/branch-1.4
Commit: 7edb17bf078754683981501b8f6c782c6d9a7166
Parents: 81b35d8
Author: Cheng Lian <[email protected]>
Authored: Tue May 26 00:28:47 2015 +0800
Committer: Cheng Lian <[email protected]>
Committed: Tue May 26 00:29:06 2015 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/sources/commands.scala | 33 +++++++++++----
 .../spark/sql/sources/SimpleTextRelation.scala  | 42 +++++++++++++++++++-
 .../sql/sources/hadoopFsRelationSuites.scala    | 22 ++++++++++
 3 files changed, 87 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7edb17bf/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index c3674a8..fbd98ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -377,13 +377,22 @@ private[sql] class DefaultWriterContainer(
   override def outputWriterForRow(row: Row): OutputWriter = writer
 
   override def commitTask(): Unit = {
-    writer.close()
-    super.commitTask()
+    try {
+      writer.close()
+      super.commitTask()
+    } catch {
+      case cause: Throwable =>
+        super.abortTask()
+        throw new RuntimeException("Failed to commit task", cause)
+    }
   }
 
   override def abortTask(): Unit = {
-    writer.close()
-    super.abortTask()
+    try {
+      writer.close()
+    } finally {
+      super.abortTask()
+    }
   }
 }
 
@@ -422,13 +431,21 @@ private[sql] class DynamicPartitionWriterContainer(
   }
 
   override def commitTask(): Unit = {
-    outputWriters.values.foreach(_.close())
-    super.commitTask()
+    try {
+      outputWriters.values.foreach(_.close())
+      super.commitTask()
+    } catch { case cause: Throwable =>
+      super.abortTask()
+      throw new RuntimeException("Failed to commit task", cause)
+    }
   }
 
   override def abortTask(): Unit = {
-    outputWriters.values.foreach(_.close())
-    super.abortTask()
+    try {
+      outputWriters.values.foreach(_.close())
+    } finally {
+      super.abortTask()
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7edb17bf/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 2d69b89..de90784 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.mapreduce.{Job, RecordWriter, 
TaskAttemptContext}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.{Row, SQLContext}
 
 /**
@@ -67,7 +67,9 @@ class SimpleTextOutputWriter(path: String, context: 
TaskAttemptContext) extends
     recordWriter.write(null, new Text(serialized))
   }
 
-  override def close(): Unit = recordWriter.close(context)
+  override def close(): Unit = {
+    recordWriter.close(context)
+  }
 }
 
 /**
@@ -120,3 +122,39 @@ class SimpleTextRelation(
     }
   }
 }
+
+/**
+ * A simple example [[HadoopFsRelationProvider]].
+ */
+class CommitFailureTestSource extends HadoopFsRelationProvider {
+  override def createRelation(
+      sqlContext: SQLContext,
+      paths: Array[String],
+      schema: Option[StructType],
+      partitionColumns: Option[StructType],
+      parameters: Map[String, String]): HadoopFsRelation = {
+    new CommitFailureTestRelation(paths, schema, partitionColumns, 
parameters)(sqlContext)
+  }
+}
+
+class CommitFailureTestRelation(
+    override val paths: Array[String],
+    maybeDataSchema: Option[StructType],
+    override val userDefinedPartitionColumns: Option[StructType],
+    parameters: Map[String, String])(
+    @transient sqlContext: SQLContext)
+  extends SimpleTextRelation(
+    paths, maybeDataSchema, userDefinedPartitionColumns, 
parameters)(sqlContext) {
+  override def prepareJobForWrite(job: Job): OutputWriterFactory = new 
OutputWriterFactory {
+    override def newInstance(
+        path: String,
+        dataSchema: StructType,
+        context: TaskAttemptContext): OutputWriter = {
+      new SimpleTextOutputWriter(path, context) {
+        override def close(): Unit = {
+          sys.error("Intentional task commitment failure for testing purpose.")
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7edb17bf/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 3222690..70328e1 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.sources
 
 import org.apache.hadoop.fs.Path
+import org.scalatest.FunSuite
 
+import org.apache.spark.SparkException
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql._
 import org.apache.spark.sql.hive.test.TestHive
@@ -477,6 +479,26 @@ class SimpleTextHadoopFsRelationSuite extends 
HadoopFsRelationTest {
   }
 }
 
+class CommitFailureTestRelationSuite extends FunSuite with SQLTestUtils {
+  import TestHive.implicits._
+
+  override val sqlContext = TestHive
+
+  val dataSourceName: String = 
classOf[CommitFailureTestSource].getCanonicalName
+
+  test("SPARK-7684: commitTask() failure should fallback to abortTask()") {
+    withTempPath { file =>
+      val df = (1 to 3).map(i => i -> s"val_$i").toDF("a", "b")
+      intercept[SparkException] {
+        df.write.format(dataSourceName).save(file.getCanonicalPath)
+      }
+
+      val fs = new 
Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf)
+      assert(!fs.exists(new Path(file.getCanonicalPath, "_temporary")))
+    }
+  }
+}
+
 class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
   override val dataSourceName: String = 
classOf[parquet.DefaultSource].getCanonicalName
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to