This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new b23a6a58e1ec [SPARK-50967][SS][MINOR][FOLLOW-UP] Update 
`flatMapGroupsWithState` test conf
b23a6a58e1ec is described below

commit b23a6a58e1ec56b8c985b7bd56655b76b13442f6
Author: Amanda Liu <amanda....@databricks.com>
AuthorDate: Fri Apr 18 10:06:22 2025 +0900

    [SPARK-50967][SS][MINOR][FOLLOW-UP] Update `flatMapGroupsWithState` test 
conf
    
    ### What changes were proposed in this pull request?
    
    Fix the test `flatMapGroupsWithState` to prevent multiple batches from 
being executed to generate incorrect results.
    
    ### Why are the changes needed?
    
    The fix allows us to not override the default setting of the conf 
introduced in https://issues.apache.org/jira/browse/SPARK-51747 
`READ_FILE_SOURCE_TABLE_CACHE_IGNORE_OPTIONS` (which should not affect this 
test case)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, test only
    
    ### How was this patch tested?
    
    Running flatMapGroupsWithState test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #50628 from asl3/flatMapGroupsWithStatetest.
    
    Authored-by: Amanda Liu <amanda....@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit 8b33a832b2ba4f8bb7ed34dac50778bf8cbcfa13)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala    | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala
index c5de270d4cf8..418fbd5603a8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala
@@ -422,9 +422,7 @@ class FlatMapGroupsWithStateWithInitialStateSuite extends 
StateStoreMetricsTest
       s"have same keys and 
skipEmittingInitialStateKeys=$skipEmittingInitialStateKeys") {
       withSQLConf(
         SQLConf.FLATMAPGROUPSWITHSTATE_SKIP_EMITTING_INITIAL_STATE_KEYS.key ->
-        skipEmittingInitialStateKeys.toString,
-        // restore behavior before SPARK-51747
-        SQLConf.READ_FILE_SOURCE_TABLE_CACHE_IGNORE_OPTIONS.key -> "true"
+        skipEmittingInitialStateKeys.toString
       ) {
         val initialState = Seq(
           ("apple", 1L),
@@ -442,11 +440,14 @@ class FlatMapGroupsWithStateWithInitialStateSuite extends 
StateStoreMetricsTest
             .groupByKey(x => x)
             .flatMapGroupsWithState(Update, NoTimeout(), 
initialState)(fruitCountFunc)
         testStream(result, Update)(
+          StartStream(Trigger.ProcessingTime("1 second"), triggerClock = new 
StreamManualClock),
           AddData(inputData, "apple"),
           AddData(inputData, "apple"),
           AddData(inputData, "orange"),
+          AdvanceManualClock(1 * 1000),
           CheckNewAnswer(("apple", 3), ("orange", 3)),
           AddData(inputData, "orange"),
+          AdvanceManualClock(1 * 1000),
           CheckNewAnswer(("orange", 4)),
           StopStream
         )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to