Repository: spark
Updated Branches:
  refs/heads/master 919bf3219 -> 6bc4be64f


[SPARK-14078] Streaming Parquet Based FileSink

This PR adds a new `Sink` implementation that writes out Parquet files.  In 
order to correctly handle partial failures while maintaining exactly once 
semantics, the files for each batch are written out to a unique directory and 
then atomically appended to a metadata log.  When a parquet based `DataSource` 
is initialized for reading, we first check for this log directory and use it 
instead of file listing when present.

Unit tests are added, as well as a stress test that checks the answer after 
non-deterministic injected failures.

Author: Michael Armbrust <[email protected]>

Closes #11897 from marmbrus/fileSink.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6bc4be64
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6bc4be64
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6bc4be64

Branch: refs/heads/master
Commit: 6bc4be64f86afcb38e4444c80c9400b7b6b745de
Parents: 919bf32
Author: Michael Armbrust <[email protected]>
Authored: Wed Mar 23 13:02:40 2016 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Wed Mar 23 13:03:25 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/ContinuousQuery.scala  |   9 ++
 .../spark/sql/ContinuousQueryException.scala    |   6 +-
 .../sql/execution/datasources/DataSource.scala  |  64 ++++++++-
 .../execution/streaming/CompositeOffset.scala   |   3 +
 .../execution/streaming/FileStreamSink.scala    |  81 ++++++++++++
 .../execution/streaming/FileStreamSource.scala  |  14 +-
 .../execution/streaming/HDFSMetadataLog.scala   |   7 +-
 .../sql/execution/streaming/LongOffset.scala    |   2 +
 .../sql/execution/streaming/MetadataLog.scala   |   2 +-
 .../execution/streaming/StreamExecution.scala   |  18 +++
 .../execution/streaming/StreamFileCatalog.scala |  59 +++++++++
 .../streaming/HDFSMetadataLogSuite.scala        |   2 +
 .../sql/streaming/FileStreamSinkSuite.scala     |  49 +++++++
 .../spark/sql/streaming/FileStressSuite.scala   | 129 +++++++++++++++++++
 14 files changed, 430 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6bc4be64/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
index eb69804..1dc9a68 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
@@ -92,6 +92,15 @@ trait ContinuousQuery {
   def awaitTermination(timeoutMs: Long): Boolean
 
   /**
+   * Blocks until all available data in the source has been processed an 
committed to the sink.
+   * This method is intended for testing. Note that in the case of continually 
arriving data, this
+   * method may block forever.  Additionally, this method is only guranteed to 
block until data that
+   * has been synchronously appended data to a 
[[org.apache.spark.sql.execution.streaming.Source]]
+   * prior to invocation. (i.e. `getOffset` must immediately reflect the 
addition).
+   */
+  def processAllAvailable(): Unit
+
+  /**
    * Stops the execution of this query if it is running. This method blocks 
until the threads
    * performing execution has stopped.
    * @since 2.0.0

http://git-wip-us.apache.org/repos/asf/spark/blob/6bc4be64/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala
index 67dd9db..fec3862 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala
@@ -32,12 +32,12 @@ import org.apache.spark.sql.execution.streaming.{Offset, 
StreamExecution}
  */
 @Experimental
 class ContinuousQueryException private[sql](
-    val query: ContinuousQuery,
+    @transient val query: ContinuousQuery,
     val message: String,
     val cause: Throwable,
     val startOffset: Option[Offset] = None,
-    val endOffset: Option[Offset] = None
-  ) extends Exception(message, cause) {
+    val endOffset: Option[Offset] = None)
+  extends Exception(message, cause) {
 
   /** Time when the exception occurred */
   val time: Long = System.currentTimeMillis

http://git-wip-us.apache.org/repos/asf/spark/blob/6bc4be64/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 548da86..c66921f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -22,6 +22,7 @@ import java.util.ServiceLoader
 import scala.collection.JavaConverters._
 import scala.language.{existentials, implicitConversions}
 import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.Path
 
@@ -29,7 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, 
Source}
+import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
 import org.apache.spark.util.Utils
@@ -176,14 +177,41 @@ case class DataSource(
 
   /** Returns a sink that can be used to continually write data. */
   def createSink(): Sink = {
-    val datasourceClass = providingClass.newInstance() match {
-      case s: StreamSinkProvider => s
+    providingClass.newInstance() match {
+      case s: StreamSinkProvider => s.createSink(sqlContext, options, 
partitionColumns)
+      case format: FileFormat =>
+        val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+        val path = caseInsensitiveOptions.getOrElse("path", {
+          throw new IllegalArgumentException("'path' is not specified")
+        })
+
+        new FileStreamSink(sqlContext, path, format)
       case _ =>
         throw new UnsupportedOperationException(
           s"Data source $className does not support streamed writing")
     }
+  }
 
-    datasourceClass.createSink(sqlContext, options, partitionColumns)
+  /**
+   * Returns true if there is a single path that has a metadata log indicating 
which files should
+   * be read.
+   */
+  def hasMetadata(path: Seq[String]): Boolean = {
+    path match {
+      case Seq(singlePath) =>
+        try {
+          val hdfsPath = new Path(singlePath)
+          val fs = 
hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+          val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir)
+          val res = fs.exists(metadataPath)
+          res
+        } catch {
+          case NonFatal(e) =>
+            logWarning(s"Error while looking for metadata directory.")
+            false
+        }
+      case _ => false
+    }
   }
 
   /** Create a resolved [[BaseRelation]] that can be used to read data from 
this [[DataSource]] */
@@ -200,6 +228,34 @@ case class DataSource(
       case (_: RelationProvider, Some(_)) =>
         throw new AnalysisException(s"$className does not allow user-specified 
schemas.")
 
+      // We are reading from the results of a streaming query. Load files from 
the metadata log
+      // instead of listing them using HDFS APIs.
+      case (format: FileFormat, _)
+          if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
+        val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ 
paths).head)
+        val fileCatalog =
+          new StreamFileCatalog(sqlContext, basePath)
+        val dataSchema = userSpecifiedSchema.orElse {
+          format.inferSchema(
+            sqlContext,
+            caseInsensitiveOptions,
+            fileCatalog.allFiles())
+        }.getOrElse {
+          throw new AnalysisException(
+            s"Unable to infer schema for $format at 
${fileCatalog.allFiles().mkString(",")}. " +
+                "It must be specified manually")
+        }
+
+        HadoopFsRelation(
+          sqlContext,
+          fileCatalog,
+          partitionSchema = fileCatalog.partitionSpec().partitionColumns,
+          dataSchema = dataSchema,
+          bucketSpec = None,
+          format,
+          options)
+
+      // This is a non-streaming file based datasource.
       case (format: FileFormat, _) =>
         val allPaths = caseInsensitiveOptions.get("path") ++ paths
         val globbedPaths = allPaths.flatMap { path =>

http://git-wip-us.apache.org/repos/asf/spark/blob/6bc4be64/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
index e48ac59..729c846 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
@@ -64,6 +64,9 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) 
extends Offset {
     assert(sources.size == offsets.size)
     new StreamProgress ++ sources.zip(offsets).collect { case (s, Some(o)) => 
(s, o) }
   }
+
+  override def toString: String =
+    offsets.map(_.map(_.toString).getOrElse("-")).mkString("[", ", ", "]")
 }
 
 object CompositeOffset {

http://git-wip-us.apache.org/repos/asf/spark/blob/6bc4be64/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
new file mode 100644
index 0000000..e819e95
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.UUID
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.sources.FileFormat
+
+object FileStreamSink {
+  // The name of the subdirectory that is used to store metadata about which 
files are valid.
+  val metadataDir = "_spark_metadata"
+}
+
+/**
+ * A sink that writes out results to parquet files.  Each batch is written out 
to a unique
+ * directory. After all of the files in a batch have been succesfully written, 
the list of
+ * file paths is appended to the log atomically. In the case of partial 
failures, some duplicate
+ * data may be present in the target directory, but only one copy of each file 
will be present
+ * in the log.
+ */
+class FileStreamSink(
+    sqlContext: SQLContext,
+    path: String,
+    fileFormat: FileFormat) extends Sink with Logging {
+
+  private val basePath = new Path(path)
+  private val logPath = new Path(basePath, FileStreamSink.metadataDir)
+  private val fileLog = new HDFSMetadataLog[Seq[String]](sqlContext, 
logPath.toUri.toString)
+
+  override def addBatch(batchId: Long, data: DataFrame): Unit = {
+    if (fileLog.get(batchId).isDefined) {
+      logInfo(s"Skipping already committed batch $batchId")
+    } else {
+      val files = writeFiles(data)
+      if (fileLog.add(batchId, files)) {
+        logInfo(s"Committed batch $batchId")
+      } else {
+        logWarning(s"Race while writing batch $batchId")
+      }
+    }
+  }
+
+  /** Writes the [[DataFrame]] to a UUID-named dir, returning the list of 
files paths. */
+  private def writeFiles(data: DataFrame): Seq[String] = {
+    val ctx = sqlContext
+    val outputDir = path
+    val format = fileFormat
+    val schema = data.schema
+
+    val file = new Path(basePath, UUID.randomUUID().toString).toUri.toString
+    data.write.parquet(file)
+    sqlContext.read
+        .schema(data.schema)
+        .parquet(file)
+        .inputFiles
+        .map(new Path(_))
+        .filterNot(_.getName.startsWith("_"))
+        .map(_.toUri.toString)
+  }
+
+  override def toString: String = s"FileSink[$path]"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6bc4be64/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index d13b1a6..1b70055 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -44,7 +44,7 @@ class FileStreamSource(
   private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
 
   private val seenFiles = new OpenHashSet[String]
-  metadataLog.get(None, maxBatchId).foreach { case (batchId, files) =>
+  metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) =>
     files.foreach(seenFiles.add)
   }
 
@@ -114,18 +114,24 @@ class FileStreamSource(
     val endId = end.asInstanceOf[LongOffset].offset
 
     assert(startId <= endId)
-    val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten
-    logDebug(s"Return files from batches ${startId + 1}:$endId")
+    val files = metadataLog.get(Some(startId + 1), 
Some(endId)).map(_._2).flatten
+    logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
     logDebug(s"Streaming ${files.mkString(", ")}")
     dataFrameBuilder(files)
 
   }
 
   private def fetchAllFiles(): Seq[String] = {
-    fs.listStatus(new Path(path))
+    val startTime = System.nanoTime()
+    val files = fs.listStatus(new Path(path))
       .filterNot(_.getPath.getName.startsWith("_"))
       .map(_.getPath.toUri.toString)
+    val endTime = System.nanoTime()
+    logDebug(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 
1000000}ms")
+    files
   }
 
   override def getOffset: Option[Offset] = 
Some(fetchMaxOffset()).filterNot(_.offset == -1)
+
+  override def toString: String = s"FileSource[$path]"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6bc4be64/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 298b5d2..f27d23b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -170,11 +170,12 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: 
SQLContext, path: String)
     }
   }
 
-  override def get(startId: Option[Long], endId: Long): Array[(Long, T)] = {
-    val batchIds = fc.util().listStatus(metadataPath, batchFilesFilter)
+  override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, 
T)] = {
+    val files = fc.util().listStatus(metadataPath, batchFilesFilter)
+    val batchIds = files
       .map(_.getPath.getName.toLong)
       .filter { batchId =>
-      batchId <= endId && (startId.isEmpty || batchId >= startId.get)
+        (endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId 
>= startId.get)
     }
     batchIds.sorted.map(batchId => (batchId, 
get(batchId))).filter(_._2.isDefined).map {
       case (batchId, metadataOption) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/6bc4be64/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
index 008195a..bb17640 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
@@ -30,4 +30,6 @@ case class LongOffset(offset: Long) extends Offset {
 
   def +(increment: Long): LongOffset = new LongOffset(offset + increment)
   def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
+
+  override def toString: String = s"#$offset"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6bc4be64/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
index 3f9896d..cc70e1d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
@@ -42,7 +42,7 @@ trait MetadataLog[T] {
    * Return metadata for batches between startId (inclusive) and endId 
(inclusive). If `startId` is
    * `None`, just return all batches before endId (inclusive).
    */
-  def get(startId: Option[Long], endId: Long): Array[(Long, T)]
+  def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)]
 
   /**
    * Return the latest batch Id and its metadata if exist.

http://git-wip-us.apache.org/repos/asf/spark/blob/6bc4be64/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 29b058f..5abd7ec 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -239,6 +239,12 @@ class StreamExecution(
       logInfo(s"Committed offsets for batch $currentBatchId.")
       true
     } else {
+      noNewData = true
+      awaitBatchLock.synchronized {
+        // Wake up any threads that are waiting for the stream to progress.
+        awaitBatchLock.notifyAll()
+      }
+
       false
     }
   }
@@ -334,6 +340,18 @@ class StreamExecution(
     logDebug(s"Unblocked at $newOffset for $source")
   }
 
+  /** A flag to indicate that a batch has completed with no new data 
available. */
+  @volatile private var noNewData = false
+
+  override def processAllAvailable(): Unit = {
+    noNewData = false
+    while (!noNewData) {
+      awaitBatchLock.synchronized { awaitBatchLock.wait(10000) }
+      if (streamDeathCause != null) { throw streamDeathCause }
+    }
+    if (streamDeathCause != null) { throw streamDeathCause }
+  }
+
   override def awaitTermination(): Unit = {
     if (state == INITIALIZED) {
       throw new IllegalStateException("Cannot wait for termination on a query 
that has not started")

http://git-wip-us.apache.org/repos/asf/spark/blob/6bc4be64/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
new file mode 100644
index 0000000..b8d69b1
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.datasources.PartitionSpec
+import org.apache.spark.sql.sources.{FileCatalog, Partition}
+import org.apache.spark.sql.types.StructType
+
+class StreamFileCatalog(sqlContext: SQLContext, path: Path) extends 
FileCatalog with Logging {
+  val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
+  logInfo(s"Reading streaming file log from $metadataDirectory")
+  val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, 
metadataDirectory.toUri.toString)
+  val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+
+  override def paths: Seq[Path] = path :: Nil
+
+  override def partitionSpec(): PartitionSpec = PartitionSpec(StructType(Nil), 
Nil)
+
+  /**
+   * Returns all valid files grouped into partitions when the data is 
partitioned. If the data is
+   * unpartitioned, this will return a single partition with not partition 
values.
+   *
+   * @param filters the filters used to prune which partitions are returned.  
These filters must
+   *                only refer to partition columns and this method will only 
return files
+   *                where these predicates are guaranteed to evaluate to 
`true`.  Thus, these
+   *                filters will not need to be evaluated again on the 
returned data.
+   */
+  override def listFiles(filters: Seq[Expression]): Seq[Partition] =
+    Partition(InternalRow.empty, allFiles()) :: Nil
+
+  override def getStatus(path: Path): Array[FileStatus] = fs.listStatus(path)
+
+  override def refresh(): Unit = {}
+
+  override def allFiles(): Seq[FileStatus] = {
+    fs.listStatus(metadataLog.get(None, None).flatMap(_._2).map(new Path(_)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6bc4be64/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 4ddc218..9ed5686 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -27,6 +27,8 @@ import org.apache.spark.sql.test.SharedSQLContext
 
 class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
 
+  private implicit def toOption[A](a: A): Option[A] = Option(a)
+
   test("basic") {
     withTempDir { temp =>
       val metadataLog = new HDFSMetadataLog[String](sqlContext, 
temp.getAbsolutePath)

http://git-wip-us.apache.org/repos/asf/spark/blob/6bc4be64/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
new file mode 100644
index 0000000..7f31611
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.StreamTest
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.Utils
+
+class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
+  import testImplicits._
+
+  test("unpartitioned writing") {
+    val inputData = MemoryStream[Int]
+    val df = inputData.toDF()
+
+    val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
+    val checkpointDir = 
Utils.createTempDir("stream.checkpoint").getCanonicalPath
+
+    val query =
+      df.write
+        .format("parquet")
+        .option("checkpointLocation", checkpointDir)
+        .startStream(outputDir)
+
+    inputData.addData(1, 2, 3)
+    failAfter(streamingTimeout) { query.processAllAvailable() }
+
+    val outputDf = sqlContext.read.parquet(outputDir).as[Int]
+    checkDataset(
+      outputDf,
+      1, 2, 3)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6bc4be64/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
new file mode 100644
index 0000000..5a1bfb3
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+import java.util.UUID
+
+import scala.util.Random
+import scala.util.control.NonFatal
+
+import org.apache.spark.sql.{ContinuousQuery, ContinuousQueryException, 
StreamTest}
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.Utils
+
+/**
+ * A stress test for streamign queries that read and write files.  This test 
constists of
+ * two threads:
+ *  - one that writes out `numRecords` distinct integers to files of random 
sizes (the total
+ *    number of records is fixed but each files size / creation time is 
random).
+ *  - another that continually restarts a buggy streaming query (i.e. fails 
with 5% probability on
+ *    any partition).
+ *
+ * At the end, the resulting files are loaded and the answer is checked.
+ */
+class FileStressSuite extends StreamTest with SharedSQLContext {
+  import testImplicits._
+
+  test("fault tolerance stress test") {
+    val numRecords = 10000
+    val inputDir = Utils.createTempDir("stream.input").getCanonicalPath
+    val stagingDir = Utils.createTempDir("stream.staging").getCanonicalPath
+    val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
+    val checkpoint = Utils.createTempDir("stream.checkpoint").getCanonicalPath
+
+    @volatile
+    var continue = true
+    @volatile
+    var stream: ContinuousQuery = null
+
+    val writer = new Thread("stream writer") {
+      override def run(): Unit = {
+        var i = numRecords
+        while (i > 0) {
+          val count = Random.nextInt(100)
+          var j = 0
+          var string = ""
+          while (j < count && i > 0) {
+            if (i % 10000 == 0) { logError(s"Wrote record $i") }
+            string = string + i + "\n"
+            j += 1
+            i -= 1
+          }
+
+          val uuid = UUID.randomUUID().toString
+          val fileName = new File(stagingDir, uuid)
+          stringToFile(fileName, string)
+          fileName.renameTo(new File(inputDir, uuid))
+          val sleep = Random.nextInt(100)
+          Thread.sleep(sleep)
+        }
+
+        logError("== DONE WRITING ==")
+        var done = false
+        while (!done) {
+          try {
+            stream.processAllAvailable()
+            done = true
+          } catch {
+            case NonFatal(_) =>
+          }
+        }
+
+        continue = false
+        stream.stop()
+      }
+    }
+    writer.start()
+
+    val input = sqlContext.read.format("text").stream(inputDir)
+    def startStream(): ContinuousQuery = input
+        .repartition(5)
+        .as[String]
+        .mapPartitions { iter =>
+          val rand = Random.nextInt(100)
+          if (rand < 5) { sys.error("failure") }
+          iter.map(_.toLong)
+        }
+        .write
+        .format("parquet")
+        .option("checkpointLocation", checkpoint)
+        .startStream(outputDir)
+
+    var failures = 0
+    val streamThread = new Thread("stream runner") {
+      while (continue) {
+        if (failures % 10 == 0) { logError(s"Query restart #$failures") }
+        stream = startStream()
+
+        try {
+          stream.awaitTermination()
+        } catch {
+          case ce: ContinuousQueryException =>
+            failures += 1
+        }
+      }
+    }
+
+    streamThread.join()
+
+    logError(s"Stream restarted $failures times.")
+    assert(sqlContext.read.parquet(outputDir).distinct().count() == numRecords)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to