Repository: spark Updated Branches: refs/heads/master c399baa0f -> 2089e0e7e
SPARK-1482: Fix potential resource leaks in saveAsHadoopDataset and save... ...AsNewAPIHadoopDataset `writer.close` should be put in the `finally` block to avoid potential resource leaks. JIRA: https://issues.apache.org/jira/browse/SPARK-1482 Author: zsxwing <[email protected]> Closes #400 from zsxwing/SPARK-1482 and squashes the following commits: 06b197a [zsxwing] SPARK-1482: Fix potential resource leaks in saveAsHadoopDataset and saveAsNewAPIHadoopDataset Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2089e0e7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2089e0e7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2089e0e7 Branch: refs/heads/master Commit: 2089e0e7e7c73656daee7b56f8100332f4d2175c Parents: c399baa Author: zsxwing <[email protected]> Authored: Fri Apr 18 17:49:22 2014 -0700 Committer: Matei Zaharia <[email protected]> Committed: Fri Apr 18 17:49:22 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/rdd/PairRDDFunctions.scala | 30 ++++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2089e0e7/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 343e432..d250bef 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -693,11 +693,15 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) val committer = format.getOutputCommitter(hadoopContext) committer.setupTask(hadoopContext) val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] - while (iter.hasNext) { - val (k, v) = iter.next() - writer.write(k, v) + try { + while (iter.hasNext) { + val (k, v) = iter.next() + writer.write(k, v) + } + } + finally { + writer.close(hadoopContext) } - writer.close(hadoopContext) committer.commitTask(hadoopContext) return 1 } @@ -750,15 +754,17 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) writer.setup(context.stageId, context.partitionId, attemptNumber) writer.open() - - var count = 0 - while(iter.hasNext) { - val record = iter.next() - count += 1 - writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) + try { + var count = 0 + while(iter.hasNext) { + val record = iter.next() + count += 1 + writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) + } + } + finally { + writer.close() } - - writer.close() writer.commit() }
