This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d3119fac0a09 [SPARK-50378][SS] Add custom metric for tracking spent
for proc initial state in transformWithState
d3119fac0a09 is described below
commit d3119fac0a09a2c6290762c9ba573378ccf30dfc
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Fri Nov 22 12:05:22 2024 +0900
[SPARK-50378][SS] Add custom metric for tracking spent for proc initial
state in transformWithState
### What changes were proposed in this pull request?
Add custom metric for tracking spent for proc initial state in
transformWithState
### Why are the changes needed?
Adds tracking for time spent in populating initial state
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit tests
```
[info] Run completed in 2 minutes, 38 seconds.
[info] Total number of tests run: 22
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 22, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48913 from anishshri-db/task/SPARK-50378.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../spark/sql/execution/streaming/TransformWithStateExec.scala | 6 ++++++
.../spark/sql/streaming/TransformWithStateInitialStateSuite.scala | 8 ++++++++
2 files changed, 14 insertions(+)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
index 2b26d18019d1..107f98b09f85 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
@@ -410,6 +410,9 @@ case class TransformWithStateExec(
// operator specific metrics
override def customStatefulOperatorMetrics:
Seq[StatefulOperatorCustomMetric] = {
Seq(
+ // metrics around initial state
+ StatefulOperatorCustomSumMetric("initialStateProcessingTimeMs",
+ "Number of milliseconds taken to process all initial state"),
// metrics around state variables
StatefulOperatorCustomSumMetric("numValueStateVars", "Number of value
state variables"),
StatefulOperatorCustomSumMetric("numListStateVars", "Number of list
state variables"),
@@ -655,6 +658,8 @@ case class TransformWithStateExec(
statefulProcessor.init(outputMode, timeMode)
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
+ val initialStateProcTimeMs = longMetric("initialStateProcessingTimeMs")
+ val initialStateStartTimeNs = System.nanoTime
// Check if is first batch
// Only process initial states for first batch
if (processorHandle.getQueryInfo().getBatchId == 0) {
@@ -667,6 +672,7 @@ case class TransformWithStateExec(
processInitialStateRows(keyRow.asInstanceOf[UnsafeRow], valueRowIter)
}
}
+ initialStateProcTimeMs += NANOSECONDS.toMillis(System.nanoTime -
initialStateStartTimeNs)
processDataWithPartition(childDataIterator, store, processorHandle)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala
index 360656a76f35..806d2f19f6f5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala
@@ -395,6 +395,10 @@ class TransformWithStateInitialStateSuite extends
StateStoreMetricsTest
AddData(inputData, InitInputRow("k2", "update", 40.0)),
AddData(inputData, InitInputRow("non-exist", "getOption", -1.0)),
CheckNewAnswer(("non-exist", "getOption", -1.0)),
+ Execute { q =>
+ assert(q.lastProgress
+
.stateOperators(0).customMetrics.get("initialStateProcessingTimeMs") > 0)
+ },
AddData(inputData, InitInputRow("k1", "appendList", 37.0)),
AddData(inputData, InitInputRow("k2", "appendList", 40.0)),
AddData(inputData, InitInputRow("non-exist", "getList", -1.0)),
@@ -514,6 +518,10 @@ class TransformWithStateInitialStateSuite extends
StateStoreMetricsTest
AdvanceManualClock(1 * 1000),
// registered timer for "a" and "b" is 6000, first batch is processed
at ts = 1000
CheckNewAnswer(("c", "1")),
+ Execute { q =>
+ assert(q.lastProgress
+
.stateOperators(0).customMetrics.get("initialStateProcessingTimeMs") > 0)
+ },
AddData(inputData, "c"),
AdvanceManualClock(6 * 1000), // ts = 7000, "a" expires
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]