Repository: spark
Updated Branches:
  refs/heads/master c0c0ba6d2 -> 1868bd40d


http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index b63b37d..8317fb9 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
 import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.util.WriteAheadLogReader
+import org.apache.spark.streaming.util.{WriteAheadLogUtils, 
FileBasedWriteAheadLogReader}
 import org.apache.spark.streaming.util.WriteAheadLogSuite._
 import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
 
@@ -59,7 +59,7 @@ class ReceivedBlockTrackerSuite
 
   test("block addition, and block to batch allocation") {
     val receivedBlockTracker = createTracker(setCheckpointDir = false)
-    receivedBlockTracker.isLogManagerEnabled should be (false)  // should be 
disable by default
+    receivedBlockTracker.isWriteAheadLogEnabled should be (false)  // should 
be disable by default
     receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
 
     val blockInfos = generateBlockInfos()
@@ -88,7 +88,7 @@ class ReceivedBlockTrackerSuite
     receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
   }
 
-  test("block addition, block to batch allocation and cleanup with write ahead 
log") {
+  test("block addition, block to batch allocation and clean up with write 
ahead log") {
     val manualClock = new ManualClock
     // Set the time increment level to twice the rotation interval so that 
every increment creates
     // a new log file
@@ -113,11 +113,15 @@ class ReceivedBlockTrackerSuite
       
logInfo(s"\n\n=====================\n$message\n$fileContents\n=====================\n")
     }
 
-    // Start tracker and add blocks
+    // Set WAL configuration
     conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
-    
conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs",
 "1")
+    conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1")
+    require(WriteAheadLogUtils.enableReceiverLog(conf))
+    require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) 
=== 1)
+
+    // Start tracker and add blocks
     val tracker1 = createTracker(clock = manualClock)
-    tracker1.isLogManagerEnabled should be (true)
+    tracker1.isWriteAheadLogEnabled should be (true)
 
     val blockInfos1 = addBlockInfos(tracker1)
     tracker1.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
@@ -171,7 +175,7 @@ class ReceivedBlockTrackerSuite
     eventually(timeout(10 seconds), interval(10 millisecond)) {
       getWriteAheadLogFiles() should not contain oldestLogFile
     }
-    printLogFiles("After cleanup")
+    printLogFiles("After clean")
 
     // Restart tracker and verify recovered state, specifically whether info 
about the first
     // batch has been removed, but not the second batch
@@ -192,17 +196,17 @@ class ReceivedBlockTrackerSuite
   test("setting checkpoint dir but not enabling write ahead log") {
     // When WAL config is not set, log manager should not be enabled
     val tracker1 = createTracker(setCheckpointDir = true)
-    tracker1.isLogManagerEnabled should be (false)
+    tracker1.isWriteAheadLogEnabled should be (false)
 
     // When WAL is explicitly disabled, log manager should not be enabled
     conf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
     val tracker2 = createTracker(setCheckpointDir = true)
-    tracker2.isLogManagerEnabled should be(false)
+    tracker2.isWriteAheadLogEnabled should be(false)
   }
 
   /**
    * Create tracker object with the optional provided clock. Use fake clock if 
you
-   * want to control time by manually incrementing it to test log cleanup.
+   * want to control time by manually incrementing it to test log clean.
    */
   def createTracker(
       setCheckpointDir: Boolean = true,
@@ -231,7 +235,7 @@ class ReceivedBlockTrackerSuite
   def getWrittenLogData(logFiles: Seq[String] = getWriteAheadLogFiles)
     : Seq[ReceivedBlockTrackerLogEvent] = {
     logFiles.flatMap {
-      file => new WriteAheadLogReader(file, hadoopConf).toSeq
+      file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq
     }.map { byteBuffer =>
       Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array)
     }.toList
@@ -250,7 +254,7 @@ class ReceivedBlockTrackerSuite
     BatchAllocationEvent(time, AllocatedBlocks(Map((streamId -> blockInfos))))
   }
 
-  /** Create batch cleanup object from the given info */
+  /** Create batch clean object from the given info */
   def createBatchCleanup(time: Long, moreTimes: Long*): BatchCleanupEvent = {
     BatchCleanupEvent((Seq(time) ++ moreTimes).map(Time.apply))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index b84129f..393a360 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -225,7 +225,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts 
with Serializable {
       .setAppName(framework)
       .set("spark.ui.enabled", "true")
       .set("spark.streaming.receiver.writeAheadLog.enable", "true")
-      .set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")
+      .set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1")
     val batchDuration = Milliseconds(500)
     val tempDirectory = Utils.createTempDir()
     val logDirectory1 = new 
File(checkpointDirToLogDir(tempDirectory.getAbsolutePath, 0))

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 58353a5..09440b1 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -363,7 +363,7 @@ class TestReceiver extends 
Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging
   }
 
   def onStop() {
-    // no cleanup to be done, the receiving thread should stop on it own
+    // no clean to be done, the receiving thread should stop on it own
   }
 }
 
@@ -396,7 +396,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: 
Int)
   def onStop() {
     // Simulate slow receiver by waiting for all records to be produced
     while(!SlowTestReceiver.receivedAllRecords) Thread.sleep(100)
-    // no cleanup to be done, the receiving thread should stop on it own
+    // no clean to be done, the receiving thread should stop on it own
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
index c3602a5..8b300d8 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -21,12 +21,12 @@ import java.io.File
 import scala.util.Random
 
 import org.apache.hadoop.conf.Configuration
-import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
 
-import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, 
StreamBlockId}
-import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, 
WriteAheadLogWriter}
+import org.apache.spark.streaming.util.{FileBasedWriteAheadLogSegment, 
FileBasedWriteAheadLogWriter}
 import org.apache.spark.util.Utils
+import org.apache.spark.{SparkConf, SparkContext}
 
 class WriteAheadLogBackedBlockRDDSuite
   extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
@@ -100,9 +100,10 @@ class WriteAheadLogBackedBlockRDDSuite
       blockManager.putIterator(blockId, block.iterator, 
StorageLevel.MEMORY_ONLY_SER)
     }
 
-    // Generate write ahead log segments
-    val segments = generateFakeSegments(numPartitionsInBM) ++
-      writeLogSegments(data.takeRight(numPartitionsInWAL), 
blockIds.takeRight(numPartitionsInWAL))
+    // Generate write ahead log file segments
+    val recordHandles = generateFakeRecordHandles(numPartitionsInBM) ++
+      generateWALRecordHandles(data.takeRight(numPartitionsInWAL),
+        blockIds.takeRight(numPartitionsInWAL))
 
     // Make sure that the left `numPartitionsInBM` blocks are in block 
manager, and others are not
     require(
@@ -116,24 +117,24 @@ class WriteAheadLogBackedBlockRDDSuite
 
     // Make sure that the right `numPartitionsInWAL` blocks are in WALs, and 
other are not
     require(
-      segments.takeRight(numPartitionsInWAL).forall(s =>
+      recordHandles.takeRight(numPartitionsInWAL).forall(s =>
         new File(s.path.stripPrefix("file://")).exists()),
       "Expected blocks not in write ahead log"
     )
     require(
-      segments.take(numPartitionsInBM).forall(s =>
+      recordHandles.take(numPartitionsInBM).forall(s =>
         !new File(s.path.stripPrefix("file://")).exists()),
       "Unexpected blocks in write ahead log"
     )
 
     // Create the RDD and verify whether the returned data is correct
     val rdd = new WriteAheadLogBackedBlockRDD[String](sparkContext, 
blockIds.toArray,
-      segments.toArray, storeInBlockManager = false, StorageLevel.MEMORY_ONLY)
+      recordHandles.toArray, storeInBlockManager = false, 
StorageLevel.MEMORY_ONLY)
     assert(rdd.collect() === data.flatten)
 
     if (testStoreInBM) {
       val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, 
blockIds.toArray,
-        segments.toArray, storeInBlockManager = true, StorageLevel.MEMORY_ONLY)
+        recordHandles.toArray, storeInBlockManager = true, 
StorageLevel.MEMORY_ONLY)
       assert(rdd2.collect() === data.flatten)
       assert(
         blockIds.forall(blockManager.get(_).nonEmpty),
@@ -142,12 +143,12 @@ class WriteAheadLogBackedBlockRDDSuite
     }
   }
 
-  private def writeLogSegments(
+  private def generateWALRecordHandles(
       blockData: Seq[Seq[String]],
       blockIds: Seq[BlockId]
-    ): Seq[WriteAheadLogFileSegment] = {
+    ): Seq[FileBasedWriteAheadLogSegment] = {
     require(blockData.size === blockIds.size)
-    val writer = new WriteAheadLogWriter(new File(dir, "logFile").toString, 
hadoopConf)
+    val writer = new FileBasedWriteAheadLogWriter(new File(dir, 
"logFile").toString, hadoopConf)
     val segments = blockData.zip(blockIds).map { case (data, id) =>
       writer.write(blockManager.dataSerialize(id, data.iterator))
     }
@@ -155,7 +156,7 @@ class WriteAheadLogBackedBlockRDDSuite
     segments
   }
 
-  private def generateFakeSegments(count: Int): Seq[WriteAheadLogFileSegment] 
= {
-    Array.fill(count)(new WriteAheadLogFileSegment("random", 0L, 0))
+  private def generateFakeRecordHandles(count: Int): 
Seq[FileBasedWriteAheadLogSegment] = {
+    Array.fill(count)(new FileBasedWriteAheadLogSegment("random", 0L, 0))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index a3919c4..79098bc 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -18,33 +18,38 @@ package org.apache.spark.streaming.util
 
 import java.io._
 import java.nio.ByteBuffer
+import java.util
 
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
 import scala.language.{implicitConversions, postfixOps}
+import scala.reflect.ClassTag
 
-import WriteAheadLogSuite._
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.spark.util.{ManualClock, Utils}
-import org.scalatest.{BeforeAndAfter, FunSuite}
 import org.scalatest.concurrent.Eventually._
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark.util.{ManualClock, Utils}
+import org.apache.spark.{SparkConf, SparkException}
 
 class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
 
+  import WriteAheadLogSuite._
+  
   val hadoopConf = new Configuration()
   var tempDir: File = null
   var testDir: String = null
   var testFile: String = null
-  var manager: WriteAheadLogManager = null
+  var writeAheadLog: FileBasedWriteAheadLog = null
 
   before {
     tempDir = Utils.createTempDir()
     testDir = tempDir.toString
     testFile = new File(tempDir, "testFile").toString
-    if (manager != null) {
-      manager.stop()
-      manager = null
+    if (writeAheadLog != null) {
+      writeAheadLog.close()
+      writeAheadLog = null
     }
   }
 
@@ -52,16 +57,60 @@ class WriteAheadLogSuite extends FunSuite with 
BeforeAndAfter {
     Utils.deleteRecursively(tempDir)
   }
 
-  test("WriteAheadLogWriter - writing data") {
+  test("WriteAheadLogUtils - log selection and creation") {
+    val logDir = Utils.createTempDir().getAbsolutePath()
+
+    def assertDriverLogClass[T <: WriteAheadLog: ClassTag](conf: SparkConf): 
WriteAheadLog = {
+      val log = WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
+      assert(log.getClass === implicitly[ClassTag[T]].runtimeClass)
+      log
+    }
+
+    def assertReceiverLogClass[T: ClassTag](conf: SparkConf): WriteAheadLog = {
+      val log = WriteAheadLogUtils.createLogForReceiver(conf, logDir, 
hadoopConf)
+      assert(log.getClass === implicitly[ClassTag[T]].runtimeClass)
+      log
+    }
+
+    val emptyConf = new SparkConf()  // no log configuration
+    assertDriverLogClass[FileBasedWriteAheadLog](emptyConf)
+    assertReceiverLogClass[FileBasedWriteAheadLog](emptyConf)
+
+    // Verify setting driver WAL class
+    val conf1 = new 
SparkConf().set("spark.streaming.driver.writeAheadLog.class",
+      classOf[MockWriteAheadLog0].getName())
+    assertDriverLogClass[MockWriteAheadLog0](conf1)
+    assertReceiverLogClass[FileBasedWriteAheadLog](conf1)
+
+    // Verify setting receiver WAL class
+    val receiverWALConf = new 
SparkConf().set("spark.streaming.receiver.writeAheadLog.class",
+      classOf[MockWriteAheadLog0].getName())
+    assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf)
+    assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf)
+
+    // Verify setting receiver WAL class with 1-arg constructor
+    val receiverWALConf2 = new 
SparkConf().set("spark.streaming.receiver.writeAheadLog.class",
+      classOf[MockWriteAheadLog1].getName())
+    assertReceiverLogClass[MockWriteAheadLog1](receiverWALConf2)
+
+    // Verify failure setting receiver WAL class with 2-arg constructor
+    intercept[SparkException] {
+      val receiverWALConf3 = new 
SparkConf().set("spark.streaming.receiver.writeAheadLog.class",
+        classOf[MockWriteAheadLog2].getName())
+      assertReceiverLogClass[MockWriteAheadLog1](receiverWALConf3)
+    }
+  }
+
+  test("FileBasedWriteAheadLogWriter - writing data") {
     val dataToWrite = generateRandomData()
     val segments = writeDataUsingWriter(testFile, dataToWrite)
     val writtenData = readDataManually(segments)
     assert(writtenData === dataToWrite)
   }
 
-  test("WriteAheadLogWriter - syncing of data by writing and reading 
immediately") {
+  test("FileBasedWriteAheadLogWriter - syncing of data by writing and reading 
immediately") {
     val dataToWrite = generateRandomData()
-    val writer = new WriteAheadLogWriter(testFile, hadoopConf)
+    val writer = new FileBasedWriteAheadLogWriter(testFile, hadoopConf)
     dataToWrite.foreach { data =>
       val segment = writer.write(stringToByteBuffer(data))
       val dataRead = readDataManually(Seq(segment)).head
@@ -70,10 +119,10 @@ class WriteAheadLogSuite extends FunSuite with 
BeforeAndAfter {
     writer.close()
   }
 
-  test("WriteAheadLogReader - sequentially reading data") {
+  test("FileBasedWriteAheadLogReader - sequentially reading data") {
     val writtenData = generateRandomData()
     writeDataManually(writtenData, testFile)
-    val reader = new WriteAheadLogReader(testFile, hadoopConf)
+    val reader = new FileBasedWriteAheadLogReader(testFile, hadoopConf)
     val readData = reader.toSeq.map(byteBufferToString)
     assert(readData === writtenData)
     assert(reader.hasNext === false)
@@ -83,14 +132,14 @@ class WriteAheadLogSuite extends FunSuite with 
BeforeAndAfter {
     reader.close()
   }
 
-  test("WriteAheadLogReader - sequentially reading data written with writer") {
+  test("FileBasedWriteAheadLogReader - sequentially reading data written with 
writer") {
     val dataToWrite = generateRandomData()
     writeDataUsingWriter(testFile, dataToWrite)
     val readData = readDataUsingReader(testFile)
     assert(readData === dataToWrite)
   }
 
-  test("WriteAheadLogReader - reading data written with writer after corrupted 
write") {
+  test("FileBasedWriteAheadLogReader - reading data written with writer after 
corrupted write") {
     // Write data manually for testing the sequential reader
     val dataToWrite = generateRandomData()
     writeDataUsingWriter(testFile, dataToWrite)
@@ -113,38 +162,38 @@ class WriteAheadLogSuite extends FunSuite with 
BeforeAndAfter {
     assert(readDataUsingReader(testFile) === (dataToWrite.dropRight(1)))
   }
 
-  test("WriteAheadLogRandomReader - reading data using random reader") {
+  test("FileBasedWriteAheadLogRandomReader - reading data using random 
reader") {
     // Write data manually for testing the random reader
     val writtenData = generateRandomData()
     val segments = writeDataManually(writtenData, testFile)
 
     // Get a random order of these segments and read them back
     val writtenDataAndSegments = 
writtenData.zip(segments).toSeq.permutations.take(10).flatten
-    val reader = new WriteAheadLogRandomReader(testFile, hadoopConf)
+    val reader = new FileBasedWriteAheadLogRandomReader(testFile, hadoopConf)
     writtenDataAndSegments.foreach { case (data, segment) =>
       assert(data === byteBufferToString(reader.read(segment)))
     }
     reader.close()
   }
 
-  test("WriteAheadLogRandomReader - reading data using random reader written 
with writer") {
+  test("FileBasedWriteAheadLogRandomReader- reading data using random reader 
written with writer") {
     // Write data using writer for testing the random reader
     val data = generateRandomData()
     val segments = writeDataUsingWriter(testFile, data)
 
     // Read a random sequence of segments and verify read data
     val dataAndSegments = 
data.zip(segments).toSeq.permutations.take(10).flatten
-    val reader = new WriteAheadLogRandomReader(testFile, hadoopConf)
+    val reader = new FileBasedWriteAheadLogRandomReader(testFile, hadoopConf)
     dataAndSegments.foreach { case (data, segment) =>
       assert(data === byteBufferToString(reader.read(segment)))
     }
     reader.close()
   }
 
-  test("WriteAheadLogManager - write rotating logs") {
-    // Write data using manager
+  test("FileBasedWriteAheadLog - write rotating logs") {
+    // Write data with rotation using WriteAheadLog class
     val dataToWrite = generateRandomData()
-    writeDataUsingManager(testDir, dataToWrite)
+    writeDataUsingWriteAheadLog(testDir, dataToWrite)
 
     // Read data manually to verify the written data
     val logFiles = getLogFilesInDirectory(testDir)
@@ -153,8 +202,8 @@ class WriteAheadLogSuite extends FunSuite with 
BeforeAndAfter {
     assert(writtenData === dataToWrite)
   }
 
-  test("WriteAheadLogManager - read rotating logs") {
-    // Write data manually for testing reading through manager
+  test("FileBasedWriteAheadLog - read rotating logs") {
+    // Write data manually for testing reading through WriteAheadLog
     val writtenData = (1 to 10).map { i =>
       val data = generateRandomData()
       val file = testDir + s"/log-$i-$i"
@@ -167,25 +216,25 @@ class WriteAheadLogSuite extends FunSuite with 
BeforeAndAfter {
     assert(fileSystem.exists(logDirectoryPath) === true)
 
     // Read data using manager and verify
-    val readData = readDataUsingManager(testDir)
+    val readData = readDataUsingWriteAheadLog(testDir)
     assert(readData === writtenData)
   }
 
-  test("WriteAheadLogManager - recover past logs when creating new manager") {
+  test("FileBasedWriteAheadLog - recover past logs when creating new manager") 
{
     // Write data with manager, recover with new manager and verify
     val dataToWrite = generateRandomData()
-    writeDataUsingManager(testDir, dataToWrite)
+    writeDataUsingWriteAheadLog(testDir, dataToWrite)
     val logFiles = getLogFilesInDirectory(testDir)
     assert(logFiles.size > 1)
-    val readData = readDataUsingManager(testDir)
+    val readData = readDataUsingWriteAheadLog(testDir)
     assert(dataToWrite === readData)
   }
 
-  test("WriteAheadLogManager - cleanup old logs") {
+  test("FileBasedWriteAheadLog - clean old logs") {
     logCleanUpTest(waitForCompletion = false)
   }
 
-  test("WriteAheadLogManager - cleanup old logs synchronously") {
+  test("FileBasedWriteAheadLog - clean old logs synchronously") {
     logCleanUpTest(waitForCompletion = true)
   }
 
@@ -193,11 +242,11 @@ class WriteAheadLogSuite extends FunSuite with 
BeforeAndAfter {
     // Write data with manager, recover with new manager and verify
     val manualClock = new ManualClock
     val dataToWrite = generateRandomData()
-    manager = writeDataUsingManager(testDir, dataToWrite, manualClock, 
stopManager = false)
+    writeAheadLog = writeDataUsingWriteAheadLog(testDir, dataToWrite, 
manualClock, closeLog = false)
     val logFiles = getLogFilesInDirectory(testDir)
     assert(logFiles.size > 1)
 
-    manager.cleanupOldLogs(manualClock.getTimeMillis() / 2, waitForCompletion)
+    writeAheadLog.clean(manualClock.getTimeMillis() / 2, waitForCompletion)
 
     if (waitForCompletion) {
       assert(getLogFilesInDirectory(testDir).size < logFiles.size)
@@ -208,11 +257,11 @@ class WriteAheadLogSuite extends FunSuite with 
BeforeAndAfter {
     }
   }
 
-  test("WriteAheadLogManager - handling file errors while reading rotating 
logs") {
+  test("FileBasedWriteAheadLog - handling file errors while reading rotating 
logs") {
     // Generate a set of log files
     val manualClock = new ManualClock
     val dataToWrite1 = generateRandomData()
-    writeDataUsingManager(testDir, dataToWrite1, manualClock)
+    writeDataUsingWriteAheadLog(testDir, dataToWrite1, manualClock)
     val logFiles1 = getLogFilesInDirectory(testDir)
     assert(logFiles1.size > 1)
 
@@ -220,12 +269,12 @@ class WriteAheadLogSuite extends FunSuite with 
BeforeAndAfter {
     // Recover old files and generate a second set of log files
     val dataToWrite2 = generateRandomData()
     manualClock.advance(100000)
-    writeDataUsingManager(testDir, dataToWrite2, manualClock)
+    writeDataUsingWriteAheadLog(testDir, dataToWrite2, manualClock)
     val logFiles2 = getLogFilesInDirectory(testDir)
     assert(logFiles2.size > logFiles1.size)
 
     // Read the files and verify that all the written data can be read
-    val readData1 = readDataUsingManager(testDir)
+    val readData1 = readDataUsingWriteAheadLog(testDir)
     assert(readData1 === (dataToWrite1 ++ dataToWrite2))
 
     // Corrupt the first set of files so that they are basically unreadable
@@ -236,25 +285,51 @@ class WriteAheadLogSuite extends FunSuite with 
BeforeAndAfter {
     }
 
     // Verify that the corrupted files do not prevent reading of the second 
set of data
-    val readData = readDataUsingManager(testDir)
+    val readData = readDataUsingWriteAheadLog(testDir)
     assert(readData === dataToWrite2)
   }
+
+  test("FileBasedWriteAheadLog - do not create directories or files unless 
write") {
+    val nonexistentTempPath = File.createTempFile("test", "")
+    nonexistentTempPath.delete()
+    assert(!nonexistentTempPath.exists())
+
+    val writtenSegment = writeDataManually(generateRandomData(), testFile)
+    val wal = new FileBasedWriteAheadLog(
+      new SparkConf(), tempDir.getAbsolutePath, new Configuration(), 1, 1)
+    assert(!nonexistentTempPath.exists(), "Directory created just by creating 
log object")
+    wal.read(writtenSegment.head)
+    assert(!nonexistentTempPath.exists(), "Directory created just by 
attempting to read segment")
+  }
 }
 
 object WriteAheadLogSuite {
 
+  class MockWriteAheadLog0() extends WriteAheadLog {
+    override def write(record: ByteBuffer, time: Long): 
WriteAheadLogRecordHandle = { null }
+    override def read(handle: WriteAheadLogRecordHandle): ByteBuffer = { null }
+    override def readAll(): util.Iterator[ByteBuffer] = { null }
+    override def clean(threshTime: Long, waitForCompletion: Boolean): Unit = { 
}
+    override def close(): Unit = { }
+  }
+
+  class MockWriteAheadLog1(val conf: SparkConf) extends MockWriteAheadLog0()
+
+  class MockWriteAheadLog2(val conf: SparkConf, x: Int) extends 
MockWriteAheadLog0()
+
+
   private val hadoopConf = new Configuration()
 
   /** Write data to a file directly and return an array of the file segments 
written. */
-  def writeDataManually(data: Seq[String], file: String): 
Seq[WriteAheadLogFileSegment] = {
-    val segments = new ArrayBuffer[WriteAheadLogFileSegment]()
+  def writeDataManually(data: Seq[String], file: String): 
Seq[FileBasedWriteAheadLogSegment] = {
+    val segments = new ArrayBuffer[FileBasedWriteAheadLogSegment]()
     val writer = HdfsUtils.getOutputStream(file, hadoopConf)
     data.foreach { item =>
       val offset = writer.getPos
       val bytes = Utils.serialize(item)
       writer.writeInt(bytes.size)
       writer.write(bytes)
-      segments += WriteAheadLogFileSegment(file, offset, bytes.size)
+      segments += FileBasedWriteAheadLogSegment(file, offset, bytes.size)
     }
     writer.close()
     segments
@@ -263,8 +338,11 @@ object WriteAheadLogSuite {
   /**
    * Write data to a file using the writer class and return an array of the 
file segments written.
    */
-  def writeDataUsingWriter(filePath: String, data: Seq[String]): 
Seq[WriteAheadLogFileSegment] = {
-    val writer = new WriteAheadLogWriter(filePath, hadoopConf)
+  def writeDataUsingWriter(
+      filePath: String,
+      data: Seq[String]
+    ): Seq[FileBasedWriteAheadLogSegment] = {
+    val writer = new FileBasedWriteAheadLogWriter(filePath, hadoopConf)
     val segments = data.map {
       item => writer.write(item)
     }
@@ -272,27 +350,27 @@ object WriteAheadLogSuite {
     segments
   }
 
-  /** Write data to rotating files in log directory using the manager class. */
-  def writeDataUsingManager(
+  /** Write data to rotating files in log directory using the WriteAheadLog 
class. */
+  def writeDataUsingWriteAheadLog(
       logDirectory: String,
       data: Seq[String],
       manualClock: ManualClock = new ManualClock,
-      stopManager: Boolean = true
-    ): WriteAheadLogManager = {
+      closeLog: Boolean = true
+    ): FileBasedWriteAheadLog = {
     if (manualClock.getTimeMillis() < 100000) manualClock.setTime(10000)
-    val manager = new WriteAheadLogManager(logDirectory, hadoopConf,
-      rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = 
manualClock)
+    val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, 
hadoopConf, 1, 1)
+    
     // Ensure that 500 does not get sorted after 2000, so put a high base 
value.
     data.foreach { item =>
       manualClock.advance(500)
-      manager.writeToLog(item)
+      wal.write(item, manualClock.getTimeMillis())
     }
-    if (stopManager) manager.stop()
-    manager
+    if (closeLog) wal.close()
+    wal
   }
 
   /** Read data from a segments of a log file directly and return the list of 
byte buffers. */
-  def readDataManually(segments: Seq[WriteAheadLogFileSegment]): Seq[String] = 
{
+  def readDataManually(segments: Seq[FileBasedWriteAheadLogSegment]): 
Seq[String] = {
     segments.map { segment =>
       val reader = HdfsUtils.getInputStream(segment.path, hadoopConf)
       try {
@@ -331,18 +409,18 @@ object WriteAheadLogSuite {
 
   /** Read all the data from a log file using reader class and return the list 
of byte buffers. */
   def readDataUsingReader(file: String): Seq[String] = {
-    val reader = new WriteAheadLogReader(file, hadoopConf)
+    val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
     val readData = reader.toList.map(byteBufferToString)
     reader.close()
     readData
   }
 
-  /** Read all the data in the log file in a directory using the manager 
class. */
-  def readDataUsingManager(logDirectory: String): Seq[String] = {
-    val manager = new WriteAheadLogManager(logDirectory, hadoopConf,
-      callerName = "WriteAheadLogSuite")
-    val data = manager.readFromLog().map(byteBufferToString).toSeq
-    manager.stop()
+  /** Read all the data in the log file in a directory using the WriteAheadLog 
class. */
+  def readDataUsingWriteAheadLog(logDirectory: String): Seq[String] = {
+    import scala.collection.JavaConversions._
+    val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, 
hadoopConf, 1, 1)
+    val data = wal.readAll().map(byteBufferToString).toSeq
+    wal.close()
     data
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to