Repository: spark Updated Branches: refs/heads/branch-2.1 bc54a14b4 -> 3c8861d92
[SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years ## What changes were proposed in this pull request? Two changes - Fix how delays specified in months and years are translated to milliseconds - Following up on #16258, not show watermark when there is no watermarking in the query ## How was this patch tested? Updated and new unit tests Author: Tathagata Das <[email protected]> Closes #16304 from tdas/SPARK-18834-1. (cherry picked from commit 607a1e63dbc9269b806a9f537e1d041029333cdd) Signed-off-by: Shixiong Zhu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c8861d9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c8861d9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c8861d9 Branch: refs/heads/branch-2.1 Commit: 3c8861d924e42ff84615044930fc5531201b9b12 Parents: bc54a14 Author: Tathagata Das <[email protected]> Authored: Wed Dec 21 10:44:20 2016 -0800 Committer: Shixiong Zhu <[email protected]> Committed: Wed Dec 21 10:44:27 2016 -0800 ---------------------------------------------------------------------- .../streaming/EventTimeWatermarkExec.scala | 7 +- .../execution/streaming/ProgressReporter.scala | 7 +- .../execution/streaming/StreamExecution.scala | 2 +- .../sql/streaming/EventTimeWatermarkSuite.scala | 287 +++++++++++++++++++ .../spark/sql/streaming/WatermarkSuite.scala | 240 ---------------- 5 files changed, 299 insertions(+), 244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3c8861d9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index e8570d0..5a9a99e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -84,6 +84,11 @@ case class EventTimeWatermarkExec( child: SparkPlan) extends SparkPlan { val eventTimeStats = new EventTimeStatsAccum() + val delayMs = { + val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 + delay.milliseconds + delay.months * millisPerMonth + } + sparkContext.register(eventTimeStats) override protected def doExecute(): RDD[InternalRow] = { @@ -101,7 +106,7 @@ case class EventTimeWatermarkExec( if (a semanticEquals eventTime) { val updatedMetadata = new MetadataBuilder() .withMetadata(a.metadata) - .putLong(EventTimeWatermark.delayKey, delay.milliseconds) + .putLong(EventTimeWatermark.delayKey, delayMs) .build() a.withMetadata(updatedMetadata) http://git-wip-us.apache.org/repos/asf/spark/blob/3c8861d9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 2386f33..c5e9eae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent @@ -182,7 +182,10 @@ trait ProgressReporter extends Logging { /** Extracts statistics from the most recent query execution. */ private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = { - val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) + val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty + val watermarkTimestamp = + if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) + else Map.empty[String, String] if (!hasNewData) { return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp) http://git-wip-us.apache.org/repos/asf/spark/blob/3c8861d9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 8f97d95..e05200d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -387,7 +387,7 @@ class StreamExecution( lastExecution.executedPlan.collect { case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") - e.eventTimeStats.value.max - e.delay.milliseconds + e.eventTimeStats.value.max - e.delayMs }.headOption.foreach { newWatermarkMs => if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) { logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms") http://git-wip-us.apache.org/repos/asf/spark/blob/3c8861d9/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala new file mode 100644 index 0000000..bdfba95 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.{util => ju} +import java.text.SimpleDateFormat +import java.util.{Calendar, Date} + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.functions.{count, window} +import org.apache.spark.sql.InternalOutputModes.Complete + +class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Logging { + + import testImplicits._ + + after { + sqlContext.streams.active.foreach(_.stop()) + } + + test("error on bad column") { + val inputData = MemoryStream[Int].toDF() + val e = intercept[AnalysisException] { + inputData.withWatermark("badColumn", "1 minute") + } + assert(e.getMessage contains "badColumn") + } + + test("error on wrong type") { + val inputData = MemoryStream[Int].toDF() + val e = intercept[AnalysisException] { + inputData.withWatermark("value", "1 minute") + } + assert(e.getMessage contains "value") + assert(e.getMessage contains "int") + } + + test("event time and watermark metrics") { + // No event time metrics when there is no watermarking + val inputData1 = MemoryStream[Int] + val aggWithoutWatermark = inputData1.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + testStream(aggWithoutWatermark, outputMode = Complete)( + AddData(inputData1, 15), + CheckAnswer((15, 1)), + assertEventStats { e => assert(e.isEmpty) }, + AddData(inputData1, 10, 12, 14), + CheckAnswer((10, 3), (15, 1)), + assertEventStats { e => assert(e.isEmpty) } + ) + + // All event time metrics where watermarking is set + val inputData2 = MemoryStream[Int] + val aggWithWatermark = inputData2.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + testStream(aggWithWatermark)( + AddData(inputData2, 15), + CheckAnswer(), + assertEventStats { e => + assert(e.get("max") === formatTimestamp(15)) + assert(e.get("min") === formatTimestamp(15)) + assert(e.get("avg") === formatTimestamp(15)) + assert(e.get("watermark") === formatTimestamp(0)) + }, + AddData(inputData2, 10, 12, 14), + CheckAnswer(), + assertEventStats { e => + assert(e.get("max") === formatTimestamp(14)) + assert(e.get("min") === formatTimestamp(10)) + assert(e.get("avg") === formatTimestamp(12)) + assert(e.get("watermark") === formatTimestamp(5)) + }, + AddData(inputData2, 25), + CheckAnswer(), + assertEventStats { e => + assert(e.get("max") === formatTimestamp(25)) + assert(e.get("min") === formatTimestamp(25)) + assert(e.get("avg") === formatTimestamp(25)) + assert(e.get("watermark") === formatTimestamp(5)) + }, + AddData(inputData2, 25), + CheckAnswer((10, 3)), + assertEventStats { e => + assert(e.get("max") === formatTimestamp(25)) + assert(e.get("min") === formatTimestamp(25)) + assert(e.get("avg") === formatTimestamp(25)) + assert(e.get("watermark") === formatTimestamp(15)) + } + ) + } + + test("append-mode watermark aggregation") { + val inputData = MemoryStream[Int] + + val windowedAggregation = inputData.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + testStream(windowedAggregation)( + AddData(inputData, 10, 11, 12, 13, 14, 15), + CheckAnswer(), + AddData(inputData, 25), // Advance watermark to 15 seconds + CheckAnswer(), + AddData(inputData, 25), // Evict items less than previous watermark. + CheckAnswer((10, 5)) + ) + } + + test("delay in months and years handled correctly") { + val currentTimeMs = System.currentTimeMillis + val currentTime = new Date(currentTimeMs) + + val input = MemoryStream[Long] + val aggWithWatermark = input.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "2 years 5 months") + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + def monthsSinceEpoch(date: Date): Int = { date.getYear * 12 + date.getMonth } + + testStream(aggWithWatermark)( + AddData(input, currentTimeMs / 1000), + CheckAnswer(), + AddData(input, currentTimeMs / 1000), + CheckAnswer(), + assertEventStats { e => + assert(timestampFormat.parse(e.get("max")).getTime === (currentTimeMs / 1000) * 1000) + val watermarkTime = timestampFormat.parse(e.get("watermark")) + assert(monthsSinceEpoch(currentTime) - monthsSinceEpoch(watermarkTime) === 29) + } + ) + } + + test("recovery") { + val inputData = MemoryStream[Int] + val df = inputData.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + testStream(df)( + AddData(inputData, 10, 11, 12, 13, 14, 15), + CheckLastBatch(), + AddData(inputData, 25), // Advance watermark to 15 seconds + StopStream, + StartStream(), + CheckLastBatch(), + AddData(inputData, 25), // Evict items less than previous watermark. + CheckLastBatch((10, 5)), + StopStream, + AssertOnQuery { q => // clear the sink + q.sink.asInstanceOf[MemorySink].clear() + true + }, + StartStream(), + CheckLastBatch((10, 5)), // Recompute last batch and re-evict timestamp 10 + AddData(inputData, 30), // Advance watermark to 20 seconds + CheckLastBatch(), + StopStream, + StartStream(), // Watermark should still be 15 seconds + AddData(inputData, 17), + CheckLastBatch(), // We still do not see next batch + AddData(inputData, 30), // Advance watermark to 20 seconds + CheckLastBatch(), + AddData(inputData, 30), // Evict items less than previous watermark. + CheckLastBatch((15, 2)) // Ensure we see next window + ) + } + + test("dropping old data") { + val inputData = MemoryStream[Int] + + val windowedAggregation = inputData.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + testStream(windowedAggregation)( + AddData(inputData, 10, 11, 12), + CheckAnswer(), + AddData(inputData, 25), // Advance watermark to 15 seconds + CheckAnswer(), + AddData(inputData, 25), // Evict items less than previous watermark. + CheckAnswer((10, 3)), + AddData(inputData, 10), // 10 is later than 15 second watermark + CheckAnswer((10, 3)), + AddData(inputData, 25), + CheckAnswer((10, 3)) // Should not emit an incorrect partial result. + ) + } + + test("complete mode") { + val inputData = MemoryStream[Int] + + val windowedAggregation = inputData.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + // No eviction when asked to compute complete results. + testStream(windowedAggregation, OutputMode.Complete)( + AddData(inputData, 10, 11, 12), + CheckAnswer((10, 3)), + AddData(inputData, 25), + CheckAnswer((10, 3), (25, 1)), + AddData(inputData, 25), + CheckAnswer((10, 3), (25, 2)), + AddData(inputData, 10), + CheckAnswer((10, 4), (25, 2)), + AddData(inputData, 25), + CheckAnswer((10, 4), (25, 3)) + ) + } + + test("group by on raw timestamp") { + val inputData = MemoryStream[Int] + + val windowedAggregation = inputData.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .groupBy($"eventTime") + .agg(count("*") as 'count) + .select($"eventTime".cast("long").as[Long], $"count".as[Long]) + + testStream(windowedAggregation)( + AddData(inputData, 10), + CheckAnswer(), + AddData(inputData, 25), // Advance watermark to 15 seconds + CheckAnswer(), + AddData(inputData, 25), // Evict items less than previous watermark. + CheckAnswer((10, 1)) + ) + } + + private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = { + AssertOnQuery { q => + body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime) + true + } + } + + private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC")) + + private def formatTimestamp(sec: Long): String = { + timestampFormat.format(new ju.Date(sec * 1000)) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/3c8861d9/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala deleted file mode 100644 index f1cc19c..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming - -import java.{util => ju} -import java.text.SimpleDateFormat - -import org.scalatest.BeforeAndAfter - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, Row} -import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.functions.{count, window} - -class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { - - import testImplicits._ - - after { - sqlContext.streams.active.foreach(_.stop()) - } - - test("error on bad column") { - val inputData = MemoryStream[Int].toDF() - val e = intercept[AnalysisException] { - inputData.withWatermark("badColumn", "1 minute") - } - assert(e.getMessage contains "badColumn") - } - - test("error on wrong type") { - val inputData = MemoryStream[Int].toDF() - val e = intercept[AnalysisException] { - inputData.withWatermark("value", "1 minute") - } - assert(e.getMessage contains "value") - assert(e.getMessage contains "int") - } - - - test("event time and watermark metrics") { - val inputData = MemoryStream[Int] - - val windowedAggregation = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) - .withWatermark("eventTime", "10 seconds") - .groupBy(window($"eventTime", "5 seconds") as 'window) - .agg(count("*") as 'count) - .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) - - def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q => - body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime) - true - } - - testStream(windowedAggregation)( - AddData(inputData, 15), - CheckAnswer(), - assertEventStats { e => - assert(e.get("max") === formatTimestamp(15)) - assert(e.get("min") === formatTimestamp(15)) - assert(e.get("avg") === formatTimestamp(15)) - assert(e.get("watermark") === formatTimestamp(0)) - }, - AddData(inputData, 10, 12, 14), - CheckAnswer(), - assertEventStats { e => - assert(e.get("max") === formatTimestamp(14)) - assert(e.get("min") === formatTimestamp(10)) - assert(e.get("avg") === formatTimestamp(12)) - assert(e.get("watermark") === formatTimestamp(5)) - }, - AddData(inputData, 25), - CheckAnswer(), - assertEventStats { e => - assert(e.get("max") === formatTimestamp(25)) - assert(e.get("min") === formatTimestamp(25)) - assert(e.get("avg") === formatTimestamp(25)) - assert(e.get("watermark") === formatTimestamp(5)) - }, - AddData(inputData, 25), - CheckAnswer((10, 3)), - assertEventStats { e => - assert(e.get("max") === formatTimestamp(25)) - assert(e.get("min") === formatTimestamp(25)) - assert(e.get("avg") === formatTimestamp(25)) - assert(e.get("watermark") === formatTimestamp(15)) - } - ) - } - - test("append-mode watermark aggregation") { - val inputData = MemoryStream[Int] - - val windowedAggregation = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) - .withWatermark("eventTime", "10 seconds") - .groupBy(window($"eventTime", "5 seconds") as 'window) - .agg(count("*") as 'count) - .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) - - testStream(windowedAggregation)( - AddData(inputData, 10, 11, 12, 13, 14, 15), - CheckAnswer(), - AddData(inputData, 25), // Advance watermark to 15 seconds - CheckAnswer(), - AddData(inputData, 25), // Evict items less than previous watermark. - CheckAnswer((10, 5)) - ) - } - - test("recovery") { - val inputData = MemoryStream[Int] - val df = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) - .withWatermark("eventTime", "10 seconds") - .groupBy(window($"eventTime", "5 seconds") as 'window) - .agg(count("*") as 'count) - .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) - - testStream(df)( - AddData(inputData, 10, 11, 12, 13, 14, 15), - CheckLastBatch(), - AddData(inputData, 25), // Advance watermark to 15 seconds - StopStream, - StartStream(), - CheckLastBatch(), - AddData(inputData, 25), // Evict items less than previous watermark. - CheckLastBatch((10, 5)), - StopStream, - AssertOnQuery { q => // clear the sink - q.sink.asInstanceOf[MemorySink].clear() - true - }, - StartStream(), - CheckLastBatch((10, 5)), // Recompute last batch and re-evict timestamp 10 - AddData(inputData, 30), // Advance watermark to 20 seconds - CheckLastBatch(), - StopStream, - StartStream(), // Watermark should still be 15 seconds - AddData(inputData, 17), - CheckLastBatch(), // We still do not see next batch - AddData(inputData, 30), // Advance watermark to 20 seconds - CheckLastBatch(), - AddData(inputData, 30), // Evict items less than previous watermark. - CheckLastBatch((15, 2)) // Ensure we see next window - ) - } - - test("dropping old data") { - val inputData = MemoryStream[Int] - - val windowedAggregation = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) - .withWatermark("eventTime", "10 seconds") - .groupBy(window($"eventTime", "5 seconds") as 'window) - .agg(count("*") as 'count) - .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) - - testStream(windowedAggregation)( - AddData(inputData, 10, 11, 12), - CheckAnswer(), - AddData(inputData, 25), // Advance watermark to 15 seconds - CheckAnswer(), - AddData(inputData, 25), // Evict items less than previous watermark. - CheckAnswer((10, 3)), - AddData(inputData, 10), // 10 is later than 15 second watermark - CheckAnswer((10, 3)), - AddData(inputData, 25), - CheckAnswer((10, 3)) // Should not emit an incorrect partial result. - ) - } - - test("complete mode") { - val inputData = MemoryStream[Int] - - val windowedAggregation = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) - .withWatermark("eventTime", "10 seconds") - .groupBy(window($"eventTime", "5 seconds") as 'window) - .agg(count("*") as 'count) - .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) - - // No eviction when asked to compute complete results. - testStream(windowedAggregation, OutputMode.Complete)( - AddData(inputData, 10, 11, 12), - CheckAnswer((10, 3)), - AddData(inputData, 25), - CheckAnswer((10, 3), (25, 1)), - AddData(inputData, 25), - CheckAnswer((10, 3), (25, 2)), - AddData(inputData, 10), - CheckAnswer((10, 4), (25, 2)), - AddData(inputData, 25), - CheckAnswer((10, 4), (25, 3)) - ) - } - - test("group by on raw timestamp") { - val inputData = MemoryStream[Int] - - val windowedAggregation = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) - .withWatermark("eventTime", "10 seconds") - .groupBy($"eventTime") - .agg(count("*") as 'count) - .select($"eventTime".cast("long").as[Long], $"count".as[Long]) - - testStream(windowedAggregation)( - AddData(inputData, 10), - CheckAnswer(), - AddData(inputData, 25), // Advance watermark to 15 seconds - CheckAnswer(), - AddData(inputData, 25), // Evict items less than previous watermark. - CheckAnswer((10, 1)) - ) - } - - private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 - timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC")) - - private def formatTimestamp(sec: Long): String = { - timestampFormat.format(new ju.Date(sec * 1000)) - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
