Repository: spark
Updated Branches:
  refs/heads/branch-1.4 ef42ce613 -> 00efa3ced


[SPARK-11424] Guard against double-close() of RecordReaders (branch-1.4 
backport)

This is a branch-1.4 backport of #9382, a fix for SPARK-11424.

Author: Josh Rosen <[email protected]>

Closes #9388 from JoshRosen/hadoop-decompressor-pooling-fix-branch-1.4.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00efa3ce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00efa3ce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00efa3ce

Branch: refs/heads/branch-1.4
Commit: 00efa3cedf07b15d344bf8e476852c36f3047f83
Parents: ef42ce6
Author: Josh Rosen <[email protected]>
Authored: Mon Nov 2 15:56:07 2015 -0800
Committer: Josh Rosen <[email protected]>
Committed: Mon Nov 2 15:56:07 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 23 +++++++++++-------
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 25 +++++++++++++-------
 .../org/apache/spark/util/NextIterator.scala    |  4 +++-
 .../spark/sql/sources/SqlNewHadoopRDD.scala     | 25 +++++++++++++-------
 4 files changed, 50 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/00efa3ce/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 0198234..5a2d77d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -257,8 +257,21 @@ class HadoopRDD[K, V](
       }
 
       override def close() {
-        try {
-          reader.close()
+        if (reader != null) {
+          // Close the reader and release it. Note: it's very important that 
we don't close the
+          // reader more than once, since that exposes us to MAPREDUCE-5918 
when running against
+          // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to 
non-deterministic
+          // corruption issues when reading compressed input.
+          try {
+            reader.close()
+          } catch {
+            case e: Exception =>
+              if (!ShutdownHookManager.inShutdown()) {
+                logWarning("Exception in RecordReader.close()", e)
+              }
+          } finally {
+            reader = null
+          }
           if (bytesReadCallback.isDefined) {
             inputMetrics.updateBytesRead()
           } else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
@@ -272,12 +285,6 @@ class HadoopRDD[K, V](
                 logWarning("Unable to get input size to set InputMetrics for 
task", e)
             }
           }
-        } catch {
-          case e: Exception => {
-            if (!ShutdownHookManager.inShutdown()) {
-              logWarning("Exception in RecordReader.close()", e)
-            }
-          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/00efa3ce/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 7c4e4fc..f93407d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -128,7 +128,7 @@ class NewHadoopRDD[K, V](
           configurable.setConf(conf)
         case _ =>
       }
-      val reader = format.createRecordReader(
+      var reader = format.createRecordReader(
         split.serializableHadoopSplit.value, hadoopAttemptContext)
       reader.initialize(split.serializableHadoopSplit.value, 
hadoopAttemptContext)
 
@@ -158,8 +158,21 @@ class NewHadoopRDD[K, V](
       }
 
       private def close() {
-        try {
-          reader.close()
+        if (reader != null) {
+          // Close the reader and release it. Note: it's very important that 
we don't close the
+          // reader more than once, since that exposes us to MAPREDUCE-5918 
when running against
+          // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to 
non-deterministic
+          // corruption issues when reading compressed input.
+          try {
+            reader.close()
+          } catch {
+            case e: Exception =>
+              if (!ShutdownHookManager.inShutdown()) {
+                logWarning("Exception in RecordReader.close()", e)
+              }
+          } finally {
+            reader = null
+          }
           if (bytesReadCallback.isDefined) {
             inputMetrics.updateBytesRead()
           } else if 
(split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
@@ -173,12 +186,6 @@ class NewHadoopRDD[K, V](
                 logWarning("Unable to get input size to set InputMetrics for 
task", e)
             }
           }
-        } catch {
-          case e: Exception => {
-            if (!ShutdownHookManager.inShutdown()) {
-              logWarning("Exception in RecordReader.close()", e)
-            }
-          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/00efa3ce/core/src/main/scala/org/apache/spark/util/NextIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/NextIterator.scala 
b/core/src/main/scala/org/apache/spark/util/NextIterator.scala
index e5c732a..0b505a5 100644
--- a/core/src/main/scala/org/apache/spark/util/NextIterator.scala
+++ b/core/src/main/scala/org/apache/spark/util/NextIterator.scala
@@ -60,8 +60,10 @@ private[spark] abstract class NextIterator[U] extends 
Iterator[U] {
    */
   def closeIfNeeded() {
     if (!closed) {
-      close()
+      // Note: it's important that we set closed = true before calling 
close(), since setting it
+      // afterwards would permit us to call close() multiple times if close() 
threw an exception.
       closed = true
+      close()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/00efa3ce/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
index d5aeb46..d62db4f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
@@ -148,7 +148,7 @@ private[sql] class SqlNewHadoopRDD[K, V](
           configurable.setConf(conf)
         case _ =>
       }
-      val reader = format.createRecordReader(
+      var reader = format.createRecordReader(
         split.serializableHadoopSplit.value, hadoopAttemptContext)
       reader.initialize(split.serializableHadoopSplit.value, 
hadoopAttemptContext)
 
@@ -178,8 +178,21 @@ private[sql] class SqlNewHadoopRDD[K, V](
       }
 
       private def close() {
-        try {
-          reader.close()
+        if (reader != null) {
+          // Close the reader and release it. Note: it's very important that 
we don't close the
+          // reader more than once, since that exposes us to MAPREDUCE-5918 
when running against
+          // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to 
non-deterministic
+          // corruption issues when reading compressed input.
+          try {
+            reader.close()
+          } catch {
+            case e: Exception =>
+              if (!ShutdownHookManager.inShutdown()) {
+                logWarning("Exception in RecordReader.close()", e)
+              }
+          } finally {
+            reader = null;
+          }
           if (bytesReadCallback.isDefined) {
             inputMetrics.updateBytesRead()
           } else if 
(split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
@@ -193,12 +206,6 @@ private[sql] class SqlNewHadoopRDD[K, V](
                 logWarning("Unable to get input size to set InputMetrics for 
task", e)
             }
           }
-        } catch {
-          case e: Exception => {
-            if (!ShutdownHookManager.inShutdown()) {
-              logWarning("Exception in RecordReader.close()", e)
-            }
-          }
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to