This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new b23a6a58e1ec [SPARK-50967][SS][MINOR][FOLLOW-UP] Update `flatMapGroupsWithState` test conf b23a6a58e1ec is described below commit b23a6a58e1ec56b8c985b7bd56655b76b13442f6 Author: Amanda Liu <amanda....@databricks.com> AuthorDate: Fri Apr 18 10:06:22 2025 +0900 [SPARK-50967][SS][MINOR][FOLLOW-UP] Update `flatMapGroupsWithState` test conf ### What changes were proposed in this pull request? Fix the test `flatMapGroupsWithState` to prevent multiple batches from being executed to generate incorrect results. ### Why are the changes needed? The fix allows us to not override the default setting of the conf introduced in https://issues.apache.org/jira/browse/SPARK-51747 `READ_FILE_SOURCE_TABLE_CACHE_IGNORE_OPTIONS` (which should not affect this test case) ### Does this PR introduce _any_ user-facing change? No, test only ### How was this patch tested? Running flatMapGroupsWithState test ### Was this patch authored or co-authored using generative AI tooling? No Closes #50628 from asl3/flatMapGroupsWithStatetest. Authored-by: Amanda Liu <amanda....@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit 8b33a832b2ba4f8bb7ed34dac50778bf8cbcfa13) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala index c5de270d4cf8..418fbd5603a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala @@ -422,9 +422,7 @@ class FlatMapGroupsWithStateWithInitialStateSuite extends StateStoreMetricsTest s"have same keys and skipEmittingInitialStateKeys=$skipEmittingInitialStateKeys") { withSQLConf( SQLConf.FLATMAPGROUPSWITHSTATE_SKIP_EMITTING_INITIAL_STATE_KEYS.key -> - skipEmittingInitialStateKeys.toString, - // restore behavior before SPARK-51747 - SQLConf.READ_FILE_SOURCE_TABLE_CACHE_IGNORE_OPTIONS.key -> "true" + skipEmittingInitialStateKeys.toString ) { val initialState = Seq( ("apple", 1L), @@ -442,11 +440,14 @@ class FlatMapGroupsWithStateWithInitialStateSuite extends StateStoreMetricsTest .groupByKey(x => x) .flatMapGroupsWithState(Update, NoTimeout(), initialState)(fruitCountFunc) testStream(result, Update)( + StartStream(Trigger.ProcessingTime("1 second"), triggerClock = new StreamManualClock), AddData(inputData, "apple"), AddData(inputData, "apple"), AddData(inputData, "orange"), + AdvanceManualClock(1 * 1000), CheckNewAnswer(("apple", 3), ("orange", 3)), AddData(inputData, "orange"), + AdvanceManualClock(1 * 1000), CheckNewAnswer(("orange", 4)), StopStream ) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org