This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new e567fd14d57 branch-3.0: [fix](test) fix test_routine_load_job_schedule
unstable (#54889)
e567fd14d57 is described below
commit e567fd14d57e1dbe859554f39261b0eb5cec03bb
Author: hui lai <[email protected]>
AuthorDate: Sat Aug 16 16:02:08 2025 +0800
branch-3.0: [fix](test) fix test_routine_load_job_schedule unstable (#54889)
### What problem does this PR solve?
pick https://github.com/apache/doris/pull/54341
---
.../load_p0/routine_load/test_routine_load_job_schedule.groovy | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy
index c892d9dbde0..cb8fba35830 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy
@@ -38,7 +38,6 @@ suite("test_routine_load_job_schedule","nonConcurrent") {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
def producer = new KafkaProducer<>(props)
def adminClient = AdminClient.create(props)
- def newTopic = new NewTopic(kafkaCsvTpoics[0], 5, (short)1)
def testData = [
"1,test_data_1,2023-01-01,value1,2023-01-01 10:00:00,extra1",
"2,test_data_2,2023-01-02,value2,2023-01-02 11:00:00,extra2",
@@ -46,10 +45,9 @@ suite("test_routine_load_job_schedule","nonConcurrent") {
"4,test_data_4,2023-01-04,value4,2023-01-04 13:00:00,extra4",
"5,test_data_5,2023-01-05,value5,2023-01-05 14:00:00,extra5"
]
- adminClient.createTopics(Collections.singletonList(newTopic))
- testData.eachWithIndex { line, index ->
+ testData.each { line->
logger.info("Sending data to kafka: ${line}")
- def record = new ProducerRecord<>(newTopic.name(), index, null,
line)
+ def record = new ProducerRecord<>(kafkaCsvTpoics[0], null, line)
producer.send(record)
}
producer.close()
@@ -95,7 +93,7 @@ suite("test_routine_load_job_schedule","nonConcurrent") {
logger.info("Routine load state: ${routineLoadState}")
logger.info("Routine load statistic: ${statistic}")
def rowCount = sql "select count(*) from ${tableName}"
- if (routineLoadState == "RUNNING" && rowCount[0][0] == 5) {
+ if (routineLoadState == "RUNNING" && rowCount[0][0] > 0) {
break
}
if (count > maxWaitCount) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]