This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 309407c2c412 [SPARK-54977][PYTHON][TEST] Accelerate test_observation.py
309407c2c412 is described below
commit 309407c2c412c631c9ed11b3d01278a087797f33
Author: Tian Gao <[email protected]>
AuthorDate: Fri Jan 9 15:12:50 2026 +0900
[SPARK-54977][PYTHON][TEST] Accelerate test_observation.py
### What changes were proposed in this pull request?
Instead of wait for 10s in test_observation.py, we just poll until it's
correct.
### Why are the changes needed?
Test time 10s -> 1s.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Locally passed
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53740 from gaogaotiantian/optimize-test-observation.
Authored-by: Tian Gao <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/tests/test_observation.py | 28 +++++++++++++++-------------
1 file changed, 15 insertions(+), 13 deletions(-)
diff --git a/python/pyspark/sql/tests/test_observation.py
b/python/pyspark/sql/tests/test_observation.py
index 2520e3b2352b..f7a8b20a66ce 100644
--- a/python/pyspark/sql/tests/test_observation.py
+++ b/python/pyspark/sql/tests/test_observation.py
@@ -15,8 +15,6 @@
# limitations under the License.
#
-import time
-
from pyspark.sql import Row, Observation, functions as F
from pyspark.sql.types import StructType, LongType
from pyspark.errors import (
@@ -25,7 +23,7 @@ from pyspark.errors import (
PySparkValueError,
)
from pyspark.testing.sqlutils import ReusedSQLTestCase
-from pyspark.testing.utils import assertDataFrameEqual
+from pyspark.testing.utils import assertDataFrameEqual, eventually
class DataFrameObservationTestsMixin:
@@ -143,17 +141,21 @@ class DataFrameObservationTestsMixin:
)
q = df.writeStream.format("noop").queryName("test").start()
self.assertTrue(q.isActive)
- time.sleep(10)
- q.stop()
- self.assertTrue(isinstance(observed_metrics, dict))
- self.assertTrue("metric" in observed_metrics)
- row = observed_metrics["metric"]
- self.assertTrue(isinstance(row, Row))
- self.assertTrue(hasattr(row, "cnt"))
- self.assertTrue(hasattr(row, "sum"))
- self.assertGreaterEqual(row.cnt, 0)
- self.assertGreaterEqual(row.sum, 0)
+ @eventually(timeout=10, catch_assertions=True)
+ def check_observed_metrics():
+ self.assertTrue(isinstance(observed_metrics, dict))
+ self.assertTrue("metric" in observed_metrics)
+ row = observed_metrics["metric"]
+ self.assertIsInstance(row.cnt, int)
+ self.assertIsInstance(row.sum, int)
+ self.assertGreaterEqual(row.cnt, 0)
+ self.assertGreaterEqual(row.sum, 0)
+ return True
+
+ check_observed_metrics()
+
+ q.stop()
def test_observe_with_same_name_on_different_dataframe(self):
# SPARK-45656: named observations with the same name on different
datasets
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]