This is an automated email from the ASF dual-hosted git repository.
brkyvz pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 2221d3e [SPARK-29314][SS] Don't overwrite the metric "updated" of
state operator to 0 if empty batch is run
2221d3e is described below
commit 2221d3e0183140a0a98f6de92f84d2d924aab703
Author: Jungtaek Lim (HeartSaVioR) <[email protected]>
AuthorDate: Wed Apr 8 16:59:39 2020 -0700
[SPARK-29314][SS] Don't overwrite the metric "updated" of state operator to
0 if empty batch is run
### What changes were proposed in this pull request?
This patch fixes the behavior of ProgressReporter which always overwrite
the value of "updated" of state operator to 0 if there's no new data. The
behavior is correct only when we copy the state progress from "previous"
executed plan, meaning no batch has been run. (Nonzero value of "updated" would
be odd if batch didn't run, so it was correct.)
It was safe to assume no data is no batch, but SPARK-24156 enables empty
data can run the batch if Spark needs to deal with watermark. After the patch,
it only overwrites the value if both two conditions are met: 1) no data 2) no
batch.
### Why are the changes needed?
Currently Spark doesn't reflect correct metrics when empty batch is run and
this patch fixes it.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Modified UT. Note that FlatMapGroupsWithState increases the value of
"updated" when state rows are removed.
Also manually tested via below query (not a simple query to test with
spark-shell, as you'll meet closure issue in spark-shell while playing with
state func):
> query
```
case class RunningCount(count: Long)
object TestFlatMapGroupsWithState {
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val ss = SparkSession
.builder()
.appName("TestFlatMapGroupsWithState")
.getOrCreate()
ss.conf.set("spark.sql.shuffle.partitions", "5")
import ss.implicits._
val stateFunc = (key: String, values: Iterator[String], state:
GroupState[RunningCount]) => {
if (state.hasTimedOut) {
// End users are not restricted to remove the state here - they can
update the
// state as well. For example, event time session window would have
list of
// sessions here and it cannot remove entire state.
state.update(RunningCount(-1))
Iterator((key, "-1"))
} else {
val count = state.getOption.map(_.count).getOrElse(0L) + values.size
state.update(RunningCount(count))
state.setTimeoutDuration("1 seconds")
Iterator((key, count.toString))
}
}
implicit val sqlContext = ss.sqlContext
val inputData = MemoryStream[String]
val result = inputData
.toDF()
.as[String]
.groupByKey { v => v }
.flatMapGroupsWithState(OutputMode.Append(),
GroupStateTimeout.ProcessingTimeTimeout())(stateFunc)
val query = result
.writeStream
.format("memory")
.option("queryName", "test")
.outputMode("append")
.trigger(Trigger.ProcessingTime("5 second"))
.start()
Thread.sleep(1000)
var chIdx: Long = 0
while (true) {
(chIdx to chIdx + 4).map { idx => inputData.addData(idx.toString) }
chIdx += 5
// intentionally sleep much more than trigger to enable "empty" batch
Thread.sleep(10 * 1000)
}
}
}
```
> before the patch (batch 3 which was an "empty" batch)
```
{
"id":"de945a5c-882b-4dae-aa58-cb8261cbaf9e",
"runId":"f1eb6d0d-3cd5-48b2-a03b-5e989b6c151b",
"name":"test",
"timestamp":"2019-11-18T07:00:25.005Z",
"batchId":3,
"numInputRows":0,
"inputRowsPerSecond":0.0,
"processedRowsPerSecond":0.0,
"durationMs":{
"addBatch":1664,
"getBatch":0,
"latestOffset":0,
"queryPlanning":29,
"triggerExecution":1789,
"walCommit":51
},
"stateOperators":[
{
"numRowsTotal":10,
"numRowsUpdated":0,
"memoryUsedBytes":5130,
"customMetrics":{
"loadedMapCacheHitCount":15,
"loadedMapCacheMissCount":0,
"stateOnCurrentVersionSizeBytes":2722
}
}
],
"sources":[
{
"description":"MemoryStream[value#1]",
"startOffset":9,
"endOffset":9,
"numInputRows":0,
"inputRowsPerSecond":0.0,
"processedRowsPerSecond":0.0
}
],
"sink":{
"description":"MemorySink",
"numOutputRows":5
}
}
```
> after the patch (batch 3 which was an "empty" batch)
```
{
"id":"7cb41623-6b9a-408e-ae02-6796ec636fa0",
"runId":"17847710-ddfe-45f5-a7ab-b160e149382f",
"name":"test",
"timestamp":"2019-11-18T07:02:25.005Z",
"batchId":3,
"numInputRows":0,
"inputRowsPerSecond":0.0,
"processedRowsPerSecond":0.0,
"durationMs":{
"addBatch":1196,
"getBatch":0,
"latestOffset":0,
"queryPlanning":30,
"triggerExecution":1333,
"walCommit":46
},
"stateOperators":[
{
"numRowsTotal":10,
"numRowsUpdated":5,
"memoryUsedBytes":5130,
"customMetrics":{
"loadedMapCacheHitCount":15,
"loadedMapCacheMissCount":0,
"stateOnCurrentVersionSizeBytes":2722
}
}
],
"sources":[
{
"description":"MemoryStream[value#1]",
"startOffset":9,
"endOffset":9,
"numInputRows":0,
"inputRowsPerSecond":0.0,
"processedRowsPerSecond":0.0
}
],
"sink":{
"description":"MemorySink",
"numOutputRows":5
}
}
```
"numRowsUpdated" is `0` in "stateOperators" before applying the patch which
is "wrong", as we "update" the state when timeout occurs. After applying the
patch, it correctly represents the "numRowsUpdated" as `5` in "stateOperators".
Closes #25987 from HeartSaVioR/SPARK-29314.
Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Burak Yavuz <[email protected]>
(cherry picked from commit ca2ba4fe647cd60668410b68014a3991ad7fd5c9)
Signed-off-by: Burak Yavuz <[email protected]>
---
.../spark/sql/execution/streaming/ProgressReporter.scala | 12 ++++++------
.../spark/sql/streaming/FlatMapGroupsWithStateSuite.scala | 2 +-
2 files changed, 7 insertions(+), 7 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index d1086cd..0dff1c2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -154,7 +154,7 @@ trait ProgressReporter extends Logging {
assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets !=
null)
currentTriggerEndTimestamp = triggerClock.getTimeMillis()
- val executionStats = extractExecutionStats(hasNewData)
+ val executionStats = extractExecutionStats(hasNewData, hasExecuted)
val processingTimeMills = currentTriggerEndTimestamp -
currentTriggerStartTimestamp
val processingTimeSec = Math.max(1L, processingTimeMills).toDouble /
MILLIS_PER_SECOND
@@ -215,26 +215,26 @@ trait ProgressReporter extends Logging {
}
/** Extract statistics about stateful operators from the executed query
plan. */
- private def extractStateOperatorMetrics(hasNewData: Boolean):
Seq[StateOperatorProgress] = {
+ private def extractStateOperatorMetrics(hasExecuted: Boolean):
Seq[StateOperatorProgress] = {
if (lastExecution == null) return Nil
- // lastExecution could belong to one of the previous triggers if
`!hasNewData`.
+ // lastExecution could belong to one of the previous triggers if
`!hasExecuted`.
// Walking the plan again should be inexpensive.
lastExecution.executedPlan.collect {
case p if p.isInstanceOf[StateStoreWriter] =>
val progress = p.asInstanceOf[StateStoreWriter].getProgress()
- if (hasNewData) progress else progress.copy(newNumRowsUpdated = 0)
+ if (hasExecuted) progress else progress.copy(newNumRowsUpdated = 0)
}
}
/** Extracts statistics from the most recent query execution. */
- private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
+ private def extractExecutionStats(hasNewData: Boolean, hasExecuted:
Boolean): ExecutionStats = {
val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e
}.nonEmpty
val watermarkTimestamp =
if (hasEventTime) Map("watermark" ->
formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
else Map.empty[String, String]
// SPARK-19378: Still report metrics even though no data was processed
while reporting progress.
- val stateOperators = extractStateOperatorMetrics(hasNewData)
+ val stateOperators = extractStateOperatorMetrics(hasExecuted)
if (!hasNewData) {
return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index d36c64f..b04f8b0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -798,7 +798,7 @@ class FlatMapGroupsWithStateSuite extends
StateStoreMetricsTest {
}
},
CheckNewAnswer(("c", "-1")),
- assertNumStateRows(total = 0, updated = 0)
+ assertNumStateRows(total = 0, updated = 1)
)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]