Repository: spark Updated Branches: refs/heads/master c7307acda -> 0bad10d3e
[SPARK-22017] Take minimum of all watermark execs in StreamExecution. ## What changes were proposed in this pull request? Take the minimum of all watermark exec nodes as the "real" watermark in StreamExecution, rather than picking one arbitrarily. ## How was this patch tested? new unit test Author: Jose Torres <j...@databricks.com> Closes #19239 from joseph-torres/SPARK-22017. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0bad10d3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0bad10d3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0bad10d3 Branch: refs/heads/master Commit: 0bad10d3e36d3238c7ee7c0fc5465072734b3ae4 Parents: c7307ac Author: Jose Torres <j...@databricks.com> Authored: Fri Sep 15 21:10:07 2017 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Fri Sep 15 21:10:07 2017 -0700 ---------------------------------------------------------------------- .../streaming/IncrementalExecution.scala | 2 +- .../execution/streaming/StreamExecution.scala | 39 ++++++++-- .../sql/streaming/EventTimeWatermarkSuite.scala | 78 ++++++++++++++++++++ 3 files changed, 113 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0bad10d3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 258a642..19d9598 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -39,7 +39,7 @@ class IncrementalExecution( val checkpointLocation: String, val runId: UUID, val currentBatchId: Long, - offsetSeqMetadata: OffsetSeqMetadata) + val offsetSeqMetadata: OffsetSeqMetadata) extends QueryExecution(sparkSession, logicalPlan) with Logging { // Modified planner with stateful operations. http://git-wip-us.apache.org/repos/asf/spark/blob/0bad10d3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 952e431..b27a59b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -130,6 +130,16 @@ class StreamExecution( protected var offsetSeqMetadata = OffsetSeqMetadata( batchWatermarkMs = 0, batchTimestampMs = 0, sparkSession.conf) + /** + * A map of current watermarks, keyed by the position of the watermark operator in the + * physical plan. + * + * This state is 'soft state', which does not affect the correctness and semantics of watermarks + * and is not persisted across query restarts. + * The fault-tolerant watermark state is in offsetSeqMetadata. + */ + protected val watermarkMsMap: MutableMap[Int, Long] = MutableMap() + override val id: UUID = UUID.fromString(streamMetadata.id) override val runId: UUID = UUID.randomUUID @@ -560,13 +570,32 @@ class StreamExecution( } if (hasNewData) { var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs - // Update the eventTime watermark if we find one in the plan. + // Update the eventTime watermarks if we find any in the plan. if (lastExecution != null) { lastExecution.executedPlan.collect { - case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => - logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") - e.eventTimeStats.value.max - e.delayMs - }.headOption.foreach { newWatermarkMs => + case e: EventTimeWatermarkExec => e + }.zipWithIndex.foreach { + case (e, index) if e.eventTimeStats.value.count > 0 => + logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}") + val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs + val prevWatermarkMs = watermarkMsMap.get(index) + if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) { + watermarkMsMap.put(index, newWatermarkMs) + } + + // Populate 0 if we haven't seen any data yet for this watermark node. + case (_, index) => + if (!watermarkMsMap.isDefinedAt(index)) { + watermarkMsMap.put(index, 0) + } + } + + // Update the global watermark to the minimum of all watermark nodes. + // This is the safest option, because only the global watermark is fault-tolerant. Making + // it the minimum of all individual watermarks guarantees it will never advance past where + // any individual watermark operator would be if it were in a plan by itself. + if(!watermarkMsMap.isEmpty) { + val newWatermarkMs = watermarkMsMap.minBy(_._2)._2 if (newWatermarkMs > batchWatermarkMs) { logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms") batchWatermarkMs = newWatermarkMs http://git-wip-us.apache.org/repos/asf/spark/blob/0bad10d3/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 4f19fa0..f3e8cf9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -300,6 +300,84 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("watermark with 2 streams") { + import org.apache.spark.sql.functions.sum + val first = MemoryStream[Int] + + val firstDf = first.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + + val second = MemoryStream[Int] + + val secondDf = second.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "5 seconds") + .select('value) + + withTempDir { checkpointDir => + val unionWriter = firstDf.union(secondDf).agg(sum('value)) + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .format("memory") + .outputMode("complete") + .queryName("test") + + val union = unionWriter.start() + + def getWatermarkAfterData( + firstData: Seq[Int] = Seq.empty, + secondData: Seq[Int] = Seq.empty, + query: StreamingQuery = union): Long = { + if (firstData.nonEmpty) first.addData(firstData) + if (secondData.nonEmpty) second.addData(secondData) + query.processAllAvailable() + // add a dummy batch so lastExecution has the new watermark + first.addData(0) + query.processAllAvailable() + // get last watermark + val lastExecution = query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution + lastExecution.offsetSeqMetadata.batchWatermarkMs + } + + // Global watermark starts at 0 until we get data from both sides + assert(getWatermarkAfterData(firstData = Seq(11)) == 0) + assert(getWatermarkAfterData(secondData = Seq(6)) == 1000) + // Global watermark stays at left watermark 1 when right watermark moves to 2 + assert(getWatermarkAfterData(secondData = Seq(8)) == 1000) + // Global watermark switches to right side value 2 when left watermark goes higher + assert(getWatermarkAfterData(firstData = Seq(21)) == 3000) + // Global watermark goes back to left + assert(getWatermarkAfterData(secondData = Seq(17, 28, 39)) == 11000) + // Global watermark stays on left as long as it's below right + assert(getWatermarkAfterData(firstData = Seq(31)) == 21000) + assert(getWatermarkAfterData(firstData = Seq(41)) == 31000) + // Global watermark switches back to right again + assert(getWatermarkAfterData(firstData = Seq(51)) == 34000) + + // Global watermark is updated correctly with simultaneous data from both sides + assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 90000) + assert(getWatermarkAfterData(firstData = Seq(120), secondData = Seq(110)) == 105000) + assert(getWatermarkAfterData(firstData = Seq(130), secondData = Seq(125)) == 120000) + + // Global watermark doesn't decrement with simultaneous data + assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 120000) + assert(getWatermarkAfterData(firstData = Seq(140), secondData = Seq(100)) == 120000) + assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(135)) == 130000) + + // Global watermark recovers after restart, but left side watermark ahead of it does not. + assert(getWatermarkAfterData(firstData = Seq(200), secondData = Seq(190)) == 185000) + union.stop() + val union2 = unionWriter.start() + assert(getWatermarkAfterData(query = union2) == 185000) + // Even though the left side was ahead of 185000 in the last execution, the watermark won't + // increment until it gets past it in this execution. + assert(getWatermarkAfterData(secondData = Seq(200), query = union2) == 185000) + assert(getWatermarkAfterData(firstData = Seq(200), query = union2) == 190000) + } + } + test("complete mode") { val inputData = MemoryStream[Int] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org