Repository: spark
Updated Branches:
  refs/heads/branch-1.5 d554bf48c -> 727944564


[SPARK-7837] [SQL] Avoids double closing output writers when commitTask() fails

When inserting data into a `HadoopFsRelation`, if `commitTask()` of the writer 
container fails, `abortTask()` will be invoked. However, both `commitTask()` 
and `abortTask()` try to close the output writer(s). The problem is that, 
closing underlying writers may not be an idempotent operation. E.g., 
`ParquetRecordWriter.close()` throws NPE when called twice.

Author: Cheng Lian <[email protected]>

Closes #8236 from liancheng/spark-7837/double-closing.

(cherry picked from commit 76c155dd4483d58499e5cb66e5e9373bb771dbeb)
Signed-off-by: Cheng Lian <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72794456
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72794456
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72794456

Branch: refs/heads/branch-1.5
Commit: 727944564d968dbab8352958f44e2209f9d172c3
Parents: d554bf4
Author: Cheng Lian <[email protected]>
Authored: Tue Aug 18 00:59:05 2015 +0800
Committer: Cheng Lian <[email protected]>
Committed: Tue Aug 18 00:59:19 2015 +0800

----------------------------------------------------------------------
 .../execution/datasources/WriterContainer.scala | 21 +++++++--
 .../datasources/parquet/ParquetIOSuite.scala    | 46 +++++++++++++++++++-
 2 files changed, 61 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/72794456/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 2f11f40..427c399 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -217,6 +217,8 @@ private[sql] class DefaultWriterContainer(
     val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, 
taskAttemptContext)
     writer.initConverter(dataSchema)
 
+    var writerClosed = false
+
     // If anything below fails, we should abort the task.
     try {
       while (iterator.hasNext) {
@@ -235,7 +237,10 @@ private[sql] class DefaultWriterContainer(
     def commitTask(): Unit = {
       try {
         assert(writer != null, "OutputWriter instance should have been 
initialized")
-        writer.close()
+        if (!writerClosed) {
+          writer.close()
+          writerClosed = true
+        }
         super.commitTask()
       } catch {
         case cause: Throwable =>
@@ -247,7 +252,10 @@ private[sql] class DefaultWriterContainer(
 
     def abortTask(): Unit = {
       try {
-        writer.close()
+        if (!writerClosed) {
+          writer.close()
+          writerClosed = true
+        }
       } finally {
         super.abortTask()
       }
@@ -275,6 +283,8 @@ private[sql] class DynamicPartitionWriterContainer(
     val outputWriters = new java.util.HashMap[InternalRow, OutputWriter]
     executorSideSetup(taskContext)
 
+    var outputWritersCleared = false
+
     // Returns the partition key given an input row
     val getPartitionKey = UnsafeProjection.create(partitionColumns, 
inputSchema)
     // Returns the data columns to be written given an input row
@@ -379,8 +389,11 @@ private[sql] class DynamicPartitionWriterContainer(
     }
 
     def clearOutputWriters(): Unit = {
-      outputWriters.asScala.values.foreach(_.close())
-      outputWriters.clear()
+      if (!outputWritersCleared) {
+        outputWriters.asScala.values.foreach(_.close())
+        outputWriters.clear()
+        outputWritersCleared = true
+      }
     }
 
     def commitTask(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/72794456/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index d819f3a..e6b0a2e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -424,7 +424,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
 
       configuration.set(
         "spark.sql.parquet.output.committer.class",
-        classOf[BogusParquetOutputCommitter].getCanonicalName)
+        classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName)
 
       try {
         val message = intercept[SparkException] {
@@ -450,12 +450,54 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
     }.toString
     assert(errorMessage.contains("UnknownHostException"))
   }
+
+  test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
+    val clonedConf = new Configuration(configuration)
+
+    // Using a output committer that always fail when committing a task, so 
that both
+    // `commitTask()` and `abortTask()` are invoked.
+    configuration.set(
+      "spark.sql.parquet.output.committer.class",
+      classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName)
+
+    try {
+      // Before fixing SPARK-7837, the following code results in an NPE 
because both
+      // `commitTask()` and `abortTask()` try to close output writers.
+
+      withTempPath { dir =>
+        val m1 = intercept[SparkException] {
+          sqlContext.range(1).coalesce(1).write.parquet(dir.getCanonicalPath)
+        }.getCause.getMessage
+        assert(m1.contains("Intentional exception for testing purposes"))
+      }
+
+      withTempPath { dir =>
+        val m2 = intercept[SparkException] {
+          val df = sqlContext.range(1).select('id as 'a, 'id as 'b).coalesce(1)
+          df.write.partitionBy("a").parquet(dir.getCanonicalPath)
+        }.getCause.getMessage
+        assert(m2.contains("Intentional exception for testing purposes"))
+      }
+    } finally {
+      // Hadoop 1 doesn't have `Configuration.unset`
+      configuration.clear()
+      clonedConf.foreach(entry => configuration.set(entry.getKey, 
entry.getValue))
+    }
+  }
 }
 
-class BogusParquetOutputCommitter(outputPath: Path, context: 
TaskAttemptContext)
+class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: 
TaskAttemptContext)
   extends ParquetOutputCommitter(outputPath, context) {
 
   override def commitJob(jobContext: JobContext): Unit = {
     sys.error("Intentional exception for testing purposes")
   }
 }
+
+class TaskCommitFailureParquetOutputCommitter(outputPath: Path, context: 
TaskAttemptContext)
+  extends ParquetOutputCommitter(outputPath, context) {
+
+  override def commitTask(context: TaskAttemptContext): Unit = {
+    sys.error("Intentional exception for testing purposes")
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to