This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 309407c2c412 [SPARK-54977][PYTHON][TEST] Accelerate test_observation.py
309407c2c412 is described below

commit 309407c2c412c631c9ed11b3d01278a087797f33
Author: Tian Gao <[email protected]>
AuthorDate: Fri Jan 9 15:12:50 2026 +0900

    [SPARK-54977][PYTHON][TEST] Accelerate test_observation.py
    
    ### What changes were proposed in this pull request?
    
    Instead of wait for 10s in test_observation.py, we just poll until it's 
correct.
    
    ### Why are the changes needed?
    
    Test time 10s -> 1s.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Locally passed
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #53740 from gaogaotiantian/optimize-test-observation.
    
    Authored-by: Tian Gao <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/sql/tests/test_observation.py | 28 +++++++++++++++-------------
 1 file changed, 15 insertions(+), 13 deletions(-)

diff --git a/python/pyspark/sql/tests/test_observation.py 
b/python/pyspark/sql/tests/test_observation.py
index 2520e3b2352b..f7a8b20a66ce 100644
--- a/python/pyspark/sql/tests/test_observation.py
+++ b/python/pyspark/sql/tests/test_observation.py
@@ -15,8 +15,6 @@
 # limitations under the License.
 #
 
-import time
-
 from pyspark.sql import Row, Observation, functions as F
 from pyspark.sql.types import StructType, LongType
 from pyspark.errors import (
@@ -25,7 +23,7 @@ from pyspark.errors import (
     PySparkValueError,
 )
 from pyspark.testing.sqlutils import ReusedSQLTestCase
-from pyspark.testing.utils import assertDataFrameEqual
+from pyspark.testing.utils import assertDataFrameEqual, eventually
 
 
 class DataFrameObservationTestsMixin:
@@ -143,17 +141,21 @@ class DataFrameObservationTestsMixin:
         )
         q = df.writeStream.format("noop").queryName("test").start()
         self.assertTrue(q.isActive)
-        time.sleep(10)
-        q.stop()
 
-        self.assertTrue(isinstance(observed_metrics, dict))
-        self.assertTrue("metric" in observed_metrics)
-        row = observed_metrics["metric"]
-        self.assertTrue(isinstance(row, Row))
-        self.assertTrue(hasattr(row, "cnt"))
-        self.assertTrue(hasattr(row, "sum"))
-        self.assertGreaterEqual(row.cnt, 0)
-        self.assertGreaterEqual(row.sum, 0)
+        @eventually(timeout=10, catch_assertions=True)
+        def check_observed_metrics():
+            self.assertTrue(isinstance(observed_metrics, dict))
+            self.assertTrue("metric" in observed_metrics)
+            row = observed_metrics["metric"]
+            self.assertIsInstance(row.cnt, int)
+            self.assertIsInstance(row.sum, int)
+            self.assertGreaterEqual(row.cnt, 0)
+            self.assertGreaterEqual(row.sum, 0)
+            return True
+
+        check_observed_metrics()
+
+        q.stop()
 
     def test_observe_with_same_name_on_different_dataframe(self):
         # SPARK-45656: named observations with the same name on different 
datasets


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to