This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch routineload_flexible_update
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/routineload_flexible_update by
this push:
new 869af732e41 fix: Improve MOW table routine load test reliability
869af732e41 is described below
commit 869af732e419689499fbd8576e656f226589ef42
Author: Yongqiang YANG <[email protected]>
AuthorDate: Mon Jan 19 22:38:12 2026 -0800
fix: Improve MOW table routine load test reliability
1. waitForTaskFinishMoW now uses accumulated loadedRows from routine
load statistics instead of counting table rows directly. This avoids
issues with delete bitmap during partial update operations.
2. Added disable_auto_compaction=true to all MOW tables in the test
to prevent auto compaction from interfering with test results.
3. Updated expected row counts to match Kafka message counts:
- Test 1: 4 messages -> wait for loadedRows > 3
- Test 2: 3 messages -> wait for loadedRows > 2
- Test 7: 4 messages -> wait for loadedRows > 3
- Test 11: 3 messages -> wait for loadedRows > 2
- Test 12: 3 messages -> wait for loadedRows > 2
- Test 18: 3 messages -> wait for loadedRows > 2
- Test 20: 2 messages -> wait for loadedRows > 1
Co-Authored-By: Claude Opus 4.5 <[email protected]>
---
.../regression/util/RoutineLoadTestUtils.groovy | 20 +++++------
...est_routine_load_flexible_partial_update.groovy | 39 ++++++++++++++++++----
2 files changed, 42 insertions(+), 17 deletions(-)
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
index 49711089ab8..2b7ce937649 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
@@ -120,25 +120,25 @@ class RoutineLoadTestUtils {
private static int waitForTaskFinishInternal(Closure sqlRunner, String
job, String tableName, int expectedMinRows, int maxAttempts, boolean isMoW) {
def count = 0
+ def jsonSlurper = new JsonSlurper()
while (true) {
def res = sqlRunner.call("show routine load for ${job}")
def routineLoadState = res[0][8].toString()
def statistic = res[0][14].toString()
logger.info("Routine load state: ${routineLoadState}")
logger.info("Routine load statistic: ${statistic}")
- def rowCount
+ def checkValue
if (isMoW) {
- // For MOW tables, use skip_delete_bitmap to properly count
rows
- sqlRunner.call("set skip_delete_bitmap=true")
- sqlRunner.call("set skip_delete_sign=true")
- sqlRunner.call("sync")
- rowCount = sqlRunner.call("select count(*) from ${tableName}")
- sqlRunner.call("set skip_delete_bitmap=false")
- sqlRunner.call("set skip_delete_sign=false")
+ // For MOW tables, use accumulated loadedRows from statistics
+ // This avoids issues with delete bitmap during partial updates
+ def json = jsonSlurper.parseText(statistic)
+ checkValue = json.loadedRows ?: 0
+ logger.info("MOW table accumulated loadedRows: ${checkValue}")
} else {
- rowCount = sqlRunner.call("select count(*) from ${tableName}")
+ def rowCount = sqlRunner.call("select count(*) from
${tableName}")
+ checkValue = rowCount[0][0]
}
- if (routineLoadState == "RUNNING" && rowCount[0][0] >
expectedMinRows) {
+ if (routineLoadState == "RUNNING" && checkValue > expectedMinRows)
{
break
}
if (count > maxAttempts) {
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
index 4cfb58d3948..f7abaad6a24 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
@@ -44,6 +44,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
@@ -104,9 +105,8 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
producer.flush()
// wait for routine load task to finish
- // Initial: 5 rows, Kafka: 4 messages (3 updates + 1 insert),
Expected: 6 rows
- // waitForTaskFinish waits until rowCount > expectedMinRows, so
pass 5
- RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job1,
tableName1, 5)
+ // Kafka: 4 messages, wait for loadedRows > 3
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job1,
tableName1, 3)
// verify flexible partial update results
qt_select_after_flex_update1 "SELECT id, name, score, age FROM
${tableName1} ORDER BY id"
@@ -136,6 +136,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
@@ -183,7 +184,8 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
}
producer.flush()
- RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job2,
tableName2, 3)
+ // Kafka: 3 messages, wait for loadedRows > 2
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job2,
tableName2, 2)
qt_select_after_flex_update2 "SELECT id, v1, v2, v3, v4 FROM
${tableName2} ORDER BY id"
} catch (Exception e) {
@@ -209,6 +211,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
@@ -249,6 +252,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
@@ -291,6 +295,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
@@ -333,6 +338,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
@@ -376,6 +382,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
@@ -426,6 +433,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
}
producer.flush()
+ // Kafka: 4 messages, wait for loadedRows > 3
RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job7,
tableName7, 3)
// verify: id=1 should NOT be updated (filtered by WHERE),
id=2,3,4 should be updated
@@ -453,6 +461,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"enable_unique_key_skip_bitmap_column" = "false"
);
@@ -493,6 +502,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
);
@@ -533,6 +543,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
);
@@ -574,6 +585,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true"
);
"""
@@ -620,7 +632,8 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
}
producer.flush()
- RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job11,
tableName11, 3)
+ // Kafka: 3 messages, wait for loadedRows > 2
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job11,
tableName11, 2)
qt_select_after_fixed_update "SELECT id, name, score, age FROM
${tableName11} ORDER BY id"
} catch (Exception e) {
@@ -647,6 +660,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
@@ -715,7 +729,8 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
}
producer.flush()
- RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job12,
tableName12, 3)
+ // Kafka: 3 messages, wait for loadedRows > 2
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job12,
tableName12, 2)
// verify flexible partial update results after alter
qt_select_after_alter_flex "SELECT id, name, score, age FROM
${tableName12} ORDER BY id"
@@ -742,6 +757,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"enable_unique_key_skip_bitmap_column" = "false"
);
@@ -801,6 +817,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
@@ -861,6 +878,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
@@ -921,6 +939,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
@@ -981,6 +1000,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
@@ -1042,6 +1062,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
@@ -1108,6 +1129,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
}
producer.flush()
+ // Kafka: 3 messages, wait for loadedRows > 2
RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job18,
tableName18, 2)
// verify: id=1 should NOT be updated (filtered by WHERE), id=2,3
should be updated
@@ -1193,6 +1215,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
@@ -1258,7 +1281,8 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
}
producer.flush()
- RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job20,
tableName20, 2)
+ // Kafka: 2 messages, wait for loadedRows > 1
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job20,
tableName20, 1)
// with UPSERT, id=1 should have NULL for name and age (full row
replacement)
qt_select_after_alter_upsert "SELECT id, name, score, age FROM
${tableName20} ORDER BY id"
@@ -1286,6 +1310,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]