This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch routineload_flexible_update
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/routineload_flexible_update by
this push:
new 15961d50197 fix: Add waitForTaskFinishMoW for MOW table routine load
tests
15961d50197 is described below
commit 15961d50197eedaee0d0b7c9b5ac2e7a62a83bdd
Author: Yongqiang YANG <[email protected]>
AuthorDate: Mon Jan 19 22:28:38 2026 -0800
fix: Add waitForTaskFinishMoW for MOW table routine load tests
For MOW (Merge-on-Write) unique key tables, the regular row count
doesn't work properly during partial updates. Added waitForTaskFinishMoW
that sets skip_delete_bitmap=true and skip_delete_sign=true before
counting rows.
Changes:
- Added waitForTaskFinishMoW() in RoutineLoadTestUtils
- Refactored to use internal helper to share logic
- Updated flexible partial update tests to use MoW variant
Co-Authored-By: Claude Opus 4.5 <[email protected]>
---
.../regression/util/RoutineLoadTestUtils.groovy | 25 +++++++++++++++++++++-
...est_routine_load_flexible_partial_update.groovy | 14 ++++++------
2 files changed, 31 insertions(+), 8 deletions(-)
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
index 9a5e27d2680..49711089ab8 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
@@ -107,6 +107,18 @@ class RoutineLoadTestUtils {
}
static int waitForTaskFinish(Closure sqlRunner, String job, String
tableName, int expectedMinRows = 0, int maxAttempts = 60) {
+ return waitForTaskFinishInternal(sqlRunner, job, tableName,
expectedMinRows, maxAttempts, false)
+ }
+
+ /**
+ * Wait for routine load task to finish for MOW (Merge-on-Write) unique
key tables.
+ * Uses skip_delete_bitmap=true to properly count rows during partial
update operations.
+ */
+ static int waitForTaskFinishMoW(Closure sqlRunner, String job, String
tableName, int expectedMinRows = 0, int maxAttempts = 60) {
+ return waitForTaskFinishInternal(sqlRunner, job, tableName,
expectedMinRows, maxAttempts, true)
+ }
+
+ private static int waitForTaskFinishInternal(Closure sqlRunner, String
job, String tableName, int expectedMinRows, int maxAttempts, boolean isMoW) {
def count = 0
while (true) {
def res = sqlRunner.call("show routine load for ${job}")
@@ -114,7 +126,18 @@ class RoutineLoadTestUtils {
def statistic = res[0][14].toString()
logger.info("Routine load state: ${routineLoadState}")
logger.info("Routine load statistic: ${statistic}")
- def rowCount = sqlRunner.call("select count(*) from ${tableName}")
+ def rowCount
+ if (isMoW) {
+ // For MOW tables, use skip_delete_bitmap to properly count
rows
+ sqlRunner.call("set skip_delete_bitmap=true")
+ sqlRunner.call("set skip_delete_sign=true")
+ sqlRunner.call("sync")
+ rowCount = sqlRunner.call("select count(*) from ${tableName}")
+ sqlRunner.call("set skip_delete_bitmap=false")
+ sqlRunner.call("set skip_delete_sign=false")
+ } else {
+ rowCount = sqlRunner.call("select count(*) from ${tableName}")
+ }
if (routineLoadState == "RUNNING" && rowCount[0][0] >
expectedMinRows) {
break
}
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
index 46ba11b5a81..4cfb58d3948 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
@@ -106,7 +106,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
// wait for routine load task to finish
// Initial: 5 rows, Kafka: 4 messages (3 updates + 1 insert),
Expected: 6 rows
// waitForTaskFinish waits until rowCount > expectedMinRows, so
pass 5
- RoutineLoadTestUtils.waitForTaskFinish(runSql, job1, tableName1, 5)
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job1,
tableName1, 5)
// verify flexible partial update results
qt_select_after_flex_update1 "SELECT id, name, score, age FROM
${tableName1} ORDER BY id"
@@ -183,7 +183,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
}
producer.flush()
- RoutineLoadTestUtils.waitForTaskFinish(runSql, job2, tableName2, 3)
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job2,
tableName2, 3)
qt_select_after_flex_update2 "SELECT id, v1, v2, v3, v4 FROM
${tableName2} ORDER BY id"
} catch (Exception e) {
@@ -426,7 +426,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
}
producer.flush()
- RoutineLoadTestUtils.waitForTaskFinish(runSql, job7, tableName7, 3)
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job7,
tableName7, 3)
// verify: id=1 should NOT be updated (filtered by WHERE),
id=2,3,4 should be updated
qt_select_after_flex_where "SELECT id, name, score, age FROM
${tableName7} ORDER BY id"
@@ -620,7 +620,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
}
producer.flush()
- RoutineLoadTestUtils.waitForTaskFinish(runSql, job11, tableName11,
3)
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job11,
tableName11, 3)
qt_select_after_fixed_update "SELECT id, name, score, age FROM
${tableName11} ORDER BY id"
} catch (Exception e) {
@@ -715,7 +715,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
}
producer.flush()
- RoutineLoadTestUtils.waitForTaskFinish(runSql, job12, tableName12,
3)
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job12,
tableName12, 3)
// verify flexible partial update results after alter
qt_select_after_alter_flex "SELECT id, name, score, age FROM
${tableName12} ORDER BY id"
@@ -1108,7 +1108,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
}
producer.flush()
- RoutineLoadTestUtils.waitForTaskFinish(runSql, job18, tableName18,
2)
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job18,
tableName18, 2)
// verify: id=1 should NOT be updated (filtered by WHERE), id=2,3
should be updated
qt_select_after_alter_flex_where "SELECT id, name, score, age FROM
${tableName18} ORDER BY id"
@@ -1258,7 +1258,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
}
producer.flush()
- RoutineLoadTestUtils.waitForTaskFinish(runSql, job20, tableName20,
2)
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job20,
tableName20, 2)
// with UPSERT, id=1 should have NULL for name and age (full row
replacement)
qt_select_after_alter_upsert "SELECT id, name, score, age FROM
${tableName20} ORDER BY id"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]