Repository: spark Updated Branches: refs/heads/branch-1.4 ef42ce613 -> 00efa3ced
[SPARK-11424] Guard against double-close() of RecordReaders (branch-1.4 backport) This is a branch-1.4 backport of #9382, a fix for SPARK-11424. Author: Josh Rosen <[email protected]> Closes #9388 from JoshRosen/hadoop-decompressor-pooling-fix-branch-1.4. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00efa3ce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00efa3ce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00efa3ce Branch: refs/heads/branch-1.4 Commit: 00efa3cedf07b15d344bf8e476852c36f3047f83 Parents: ef42ce6 Author: Josh Rosen <[email protected]> Authored: Mon Nov 2 15:56:07 2015 -0800 Committer: Josh Rosen <[email protected]> Committed: Mon Nov 2 15:56:07 2015 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 23 +++++++++++------- .../org/apache/spark/rdd/NewHadoopRDD.scala | 25 +++++++++++++------- .../org/apache/spark/util/NextIterator.scala | 4 +++- .../spark/sql/sources/SqlNewHadoopRDD.scala | 25 +++++++++++++------- 4 files changed, 50 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/00efa3ce/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 0198234..5a2d77d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -257,8 +257,21 @@ class HadoopRDD[K, V]( } override def close() { - try { - reader.close() + if (reader != null) { + // Close the reader and release it. Note: it's very important that we don't close the + // reader more than once, since that exposes us to MAPREDUCE-5918 when running against + // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic + // corruption issues when reading compressed input. + try { + reader.close() + } catch { + case e: Exception => + if (!ShutdownHookManager.inShutdown()) { + logWarning("Exception in RecordReader.close()", e) + } + } finally { + reader = null + } if (bytesReadCallback.isDefined) { inputMetrics.updateBytesRead() } else if (split.inputSplit.value.isInstanceOf[FileSplit] || @@ -272,12 +285,6 @@ class HadoopRDD[K, V]( logWarning("Unable to get input size to set InputMetrics for task", e) } } - } catch { - case e: Exception => { - if (!ShutdownHookManager.inShutdown()) { - logWarning("Exception in RecordReader.close()", e) - } - } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/00efa3ce/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 7c4e4fc..f93407d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -128,7 +128,7 @@ class NewHadoopRDD[K, V]( configurable.setConf(conf) case _ => } - val reader = format.createRecordReader( + var reader = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) @@ -158,8 +158,21 @@ class NewHadoopRDD[K, V]( } private def close() { - try { - reader.close() + if (reader != null) { + // Close the reader and release it. Note: it's very important that we don't close the + // reader more than once, since that exposes us to MAPREDUCE-5918 when running against + // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic + // corruption issues when reading compressed input. + try { + reader.close() + } catch { + case e: Exception => + if (!ShutdownHookManager.inShutdown()) { + logWarning("Exception in RecordReader.close()", e) + } + } finally { + reader = null + } if (bytesReadCallback.isDefined) { inputMetrics.updateBytesRead() } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] || @@ -173,12 +186,6 @@ class NewHadoopRDD[K, V]( logWarning("Unable to get input size to set InputMetrics for task", e) } } - } catch { - case e: Exception => { - if (!ShutdownHookManager.inShutdown()) { - logWarning("Exception in RecordReader.close()", e) - } - } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/00efa3ce/core/src/main/scala/org/apache/spark/util/NextIterator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/NextIterator.scala b/core/src/main/scala/org/apache/spark/util/NextIterator.scala index e5c732a..0b505a5 100644 --- a/core/src/main/scala/org/apache/spark/util/NextIterator.scala +++ b/core/src/main/scala/org/apache/spark/util/NextIterator.scala @@ -60,8 +60,10 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] { */ def closeIfNeeded() { if (!closed) { - close() + // Note: it's important that we set closed = true before calling close(), since setting it + // afterwards would permit us to call close() multiple times if close() threw an exception. closed = true + close() } } http://git-wip-us.apache.org/repos/asf/spark/blob/00efa3ce/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala index d5aeb46..d62db4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala @@ -148,7 +148,7 @@ private[sql] class SqlNewHadoopRDD[K, V]( configurable.setConf(conf) case _ => } - val reader = format.createRecordReader( + var reader = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) @@ -178,8 +178,21 @@ private[sql] class SqlNewHadoopRDD[K, V]( } private def close() { - try { - reader.close() + if (reader != null) { + // Close the reader and release it. Note: it's very important that we don't close the + // reader more than once, since that exposes us to MAPREDUCE-5918 when running against + // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic + // corruption issues when reading compressed input. + try { + reader.close() + } catch { + case e: Exception => + if (!ShutdownHookManager.inShutdown()) { + logWarning("Exception in RecordReader.close()", e) + } + } finally { + reader = null; + } if (bytesReadCallback.isDefined) { inputMetrics.updateBytesRead() } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] || @@ -193,12 +206,6 @@ private[sql] class SqlNewHadoopRDD[K, V]( logWarning("Unable to get input size to set InputMetrics for task", e) } } - } catch { - case e: Exception => { - if (!ShutdownHookManager.inShutdown()) { - logWarning("Exception in RecordReader.close()", e) - } - } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
