Repository: spark
Updated Branches:
  refs/heads/master 8e674331d -> 25c01c548


[STREAMING] [MINOR] Close files correctly when iterator is finished in 
streaming WAL recovery

Currently there's no chance to close the file correctly after the iteration is 
finished, change to `CompletionIterator` to avoid resource leakage.

Author: jerryshao <[email protected]>

Closes #6050 from jerryshao/close-file-correctly and squashes the following 
commits:

52dfaf5 [jerryshao] Close files correctly when iterator is finished


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25c01c54
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25c01c54
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25c01c54

Branch: refs/heads/master
Commit: 25c01c54840a9ab768f8b917de7edc2bc2d61b9e
Parents: 8e67433
Author: jerryshao <[email protected]>
Authored: Mon May 11 14:38:58 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Mon May 11 14:38:58 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/streaming/util/FileBasedWriteAheadLog.scala    | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/25c01c54/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 9985fed..87ba4f8 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -26,7 +26,7 @@ import scala.language.postfixOps
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.{CompletionIterator, ThreadUtils}
 import org.apache.spark.{Logging, SparkConf}
 
 /**
@@ -124,7 +124,8 @@ private[streaming] class FileBasedWriteAheadLog(
 
     logFilesToRead.iterator.map { file =>
       logDebug(s"Creating log reader with $file")
-      new FileBasedWriteAheadLogReader(file, hadoopConf)
+      val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
+      CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, 
reader.close _)
     } flatMap { x => x }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to