Repository: spark Updated Branches: refs/heads/master 4cc8d8906 -> 539bb3cf9
[SPARK-18734][SS] Represent timestamp in StreamingQueryProgress as formatted string instead of millis ## What changes were proposed in this pull request? Easier to read while debugging as a formatted string (in ISO8601 format) than in millis ## How was this patch tested? Updated unit tests Author: Tathagata Das <[email protected]> Closes #16166 from tdas/SPARK-18734. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/539bb3cf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/539bb3cf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/539bb3cf Branch: refs/heads/master Commit: 539bb3cf9573be5cd86e7e6502523ce89c0de170 Parents: 4cc8d89 Author: Tathagata Das <[email protected]> Authored: Tue Dec 6 17:04:26 2016 -0800 Committer: Shixiong Zhu <[email protected]> Committed: Tue Dec 6 17:04:26 2016 -0800 ---------------------------------------------------------------------- .../spark/sql/execution/streaming/ProgressReporter.scala | 8 ++++++-- .../main/scala/org/apache/spark/sql/streaming/progress.scala | 6 +++--- .../sql/streaming/StreamingQueryStatusAndProgressSuite.scala | 8 ++++---- .../org/apache/spark/sql/streaming/StreamingQuerySuite.scala | 2 +- 4 files changed, 14 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/539bb3cf/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index d95f552..12d0c1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.execution.streaming -import java.util.UUID +import java.text.SimpleDateFormat +import java.util.{Date, TimeZone, UUID} import scala.collection.mutable import scala.collection.JavaConverters._ @@ -78,6 +79,9 @@ trait ProgressReporter extends Logging { // The timestamp we report an event that has no input data private var lastNoDataProgressEventTime = Long.MinValue + private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC")) + @volatile protected var currentStatus: StreamingQueryStatus = { new StreamingQueryStatus( @@ -156,7 +160,7 @@ trait ProgressReporter extends Logging { id = id, runId = runId, name = name, - timestamp = currentTriggerStartTimestamp, + timestamp = timestampFormat.format(new Date(currentTriggerStartTimestamp)), batchId = currentBatchId, durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava, currentWatermark = offsetSeqMetadata.batchWatermarkMs, http://git-wip-us.apache.org/repos/asf/spark/blob/539bb3cf/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index f768080..d156875 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -29,6 +29,7 @@ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.catalyst.util.DateTimeUtils /** * :: Experimental :: @@ -76,7 +77,7 @@ class StreamingQueryProgress private[sql]( val id: UUID, val runId: UUID, val name: String, - val timestamp: Long, + val timestamp: String, val batchId: Long, val durationMs: ju.Map[String, java.lang.Long], val currentWatermark: Long, @@ -109,7 +110,7 @@ class StreamingQueryProgress private[sql]( ("id" -> JString(id.toString)) ~ ("runId" -> JString(runId.toString)) ~ ("name" -> JString(name)) ~ - ("timestamp" -> JInt(timestamp)) ~ + ("timestamp" -> JString(timestamp)) ~ ("numInputRows" -> JInt(numInputRows)) ~ ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~ @@ -121,7 +122,6 @@ class StreamingQueryProgress private[sql]( ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~ ("sources" -> JArray(sources.map(_.jsonValue).toList)) ~ ("sink" -> sink.jsonValue) - } } http://git-wip-us.apache.org/repos/asf/spark/blob/539bb3cf/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 96f19db..193c943 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -38,7 +38,7 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { | "id" : "${testProgress1.id.toString}", | "runId" : "${testProgress1.runId.toString}", | "name" : "myName", - | "timestamp" : 1, + | "timestamp" : "2016-12-05T20:54:20.827Z", | "numInputRows" : 678, | "inputRowsPerSecond" : 10.0, | "durationMs" : { @@ -71,7 +71,7 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { | "id" : "${testProgress2.id.toString}", | "runId" : "${testProgress2.runId.toString}", | "name" : null, - | "timestamp" : 1, + | "timestamp" : "2016-12-05T20:54:20.827Z", | "numInputRows" : 678, | "durationMs" : { | "total" : 0 @@ -131,7 +131,7 @@ object StreamingQueryStatusAndProgressSuite { id = UUID.randomUUID, runId = UUID.randomUUID, name = "myName", - timestamp = 1L, + timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, currentWatermark = 3L, @@ -153,7 +153,7 @@ object StreamingQueryStatusAndProgressSuite { id = UUID.randomUUID, runId = UUID.randomUUID, name = null, // should not be present in the json - timestamp = 1L, + timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, currentWatermark = 3L, http://git-wip-us.apache.org/repos/asf/spark/blob/539bb3cf/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 893cb76..55dd1a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -243,7 +243,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { assert(progress.id === query.id) assert(progress.name === query.name) assert(progress.batchId === 0) - assert(progress.timestamp === 100) + assert(progress.timestamp === "1970-01-01T00:00:00.100Z") // 100 ms in UTC assert(progress.numInputRows === 2) assert(progress.processedRowsPerSecond === 2.0) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
