This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3d7a5fdf1dd [SPARK-41996][SQL][SS] Fix kafka test to verify lost
partitions to account for slow Kafka operations
3d7a5fdf1dd is described below
commit 3d7a5fdf1ddf1e9748b568d66ab366f3c0ff5e55
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Thu Jan 12 12:49:57 2023 +0900
[SPARK-41996][SQL][SS] Fix kafka test to verify lost partitions to account
for slow Kafka operations
### What changes were proposed in this pull request?
Fix kafka test to verify lost partitions to account for slow Kafka
operations
Basically its possible that kafka operations around topic deletion,
partition creation etc can exceed the streaming query timeout thereby failing
the query and test incorrectly. This change updates the exit timeout.
### Why are the changes needed?
Change is required to avoid test flakiness in the event of Kafka operations
becoming slower
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Test only change
Reran the tests multiple times:
```
[info] Assembly jar up to date:
/Users/anish.shrigondekar/spark/spark/connector/protobuf/target/scala-2.12/spark-protobuf-assembly-3.4.0-SNAPSHOT.jar
[info] - Query with Trigger.AvailableNow should throw error when topic
partitions got unavailable during subsequent batches (6 seconds, 440
milliseconds)
[info] - Query with Trigger.AvailableNow should throw error when offset(s)
in planned topic partitions got unavailable during subsequent batches (6
seconds, 331 milliseconds)
[info] Run completed in 17 seconds, 269 milliseconds.
[info] Total number of tests run: 2
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 52 s, completed Jan 11, 2023, 6:05:24
```
Closes #39520 from anishshri-db/SPARK-41996.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 18d63a9a4ef..d63b9805e55 100644
---
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -358,10 +358,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase with
.start()
}
+ // SPARK-41996 - Increase query termination timeout to ensure that
+ // Kafka operations can be completed
+ val queryTimeout = 300.seconds
val exc = intercept[Exception] {
val query = startTriggerAvailableNowQuery()
try {
- assert(query.awaitTermination(streamingTimeout.toMillis))
+ assert(query.awaitTermination(queryTimeout.toMillis))
} finally {
query.stop()
}
@@ -409,10 +412,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase with
.start()
}
+ // SPARK-41996 - Increase query termination timeout to ensure that
+ // Kafka operations can be completed
+ val queryTimeout = 300.seconds
val exc = intercept[StreamingQueryException] {
val query = startTriggerAvailableNowQuery()
try {
- assert(query.awaitTermination(streamingTimeout.toMillis))
+ assert(query.awaitTermination(queryTimeout.toMillis))
} finally {
query.stop()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]