This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch routineload_flexible_update in repository https://gitbox.apache.org/repos/asf/doris.git
commit d25ff38578788e474b9f4f23eb67a59eff250446 Author: Yongqiang YANG <[email protected]> AuthorDate: Wed Jan 14 16:14:15 2026 -0800 fix: Resolve code review issues for flexible partial update 1. Fix exception type mismatch in KafkaRoutineLoadJob.replayModifyProperties - Changed catch block from DdlException to UserException since modifyPropertiesInternal now throws UserException 2. Fix setSchemaForPartialUpdate not called for flexible partial update - Changed condition from isPartialUpdate to check both UPDATE_FIXED_COLUMNS and UPDATE_FLEXIBLE_COLUMNS modes - Aligns with StreamLoadHandler behavior 3. Update tests to allow WHERE clause and jsonpaths with flexible partial update - Tests 4, 7, 16, 18 now verify these features work correctly - Added expected output for new success test cases --- .../load/routineload/KafkaRoutineLoadJob.java | 2 +- .../doris/load/routineload/RoutineLoadJob.java | 3 +- .../test_routine_load_flexible_partial_update.out | 40 ++++ ...est_routine_load_flexible_partial_update.groovy | 236 ++++++++++++++++----- 4 files changed, 230 insertions(+), 51 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index ebb3ed0f7fb..7cd2a71d2f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -830,7 +830,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) { try { modifyPropertiesInternal(log.getJobProperties(), (KafkaDataSourceProperties) log.getDataSourceProperties()); - } catch (DdlException e) { + } catch (UserException e) { // should not happen LOG.error("failed to replay modify kafka routine load job: {}", id, e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 4554a2c2066..2dfad5c52cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1068,7 +1068,8 @@ public abstract class RoutineLoadJob throw new UserException("txn does not exist: " + txnId); } txnState.addTableIndexes(planner.getDestTable()); - if (isPartialUpdate) { + if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS + || uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) { txnState.setSchemaForPartialUpdate((OlapTable) table); } diff --git a/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out b/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out index 51176ff3f19..fdcd03a5560 100644 --- a/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out +++ b/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out @@ -25,6 +25,28 @@ 3 1000 2000 3000 4000 4 \N 9876 4444 1234 +-- !select_initial4 -- +1 alice 100 20 +2 bob 90 21 +3 charlie 80 22 + +-- !select_after_flex_jsonpaths -- +1 alice_updated 150 20 +2 bob_updated 95 21 +3 charlie 80 22 +4 diana 70 \N + +-- !select_initial7 -- +1 alice 100 20 +2 bob 90 21 +3 charlie 80 22 + +-- !select_after_flex_where -- +1 alice 100 20 +2 bob 95 21 +3 chuck 80 22 +4 diana 70 \N + -- !select_initial11 -- 1 alice 100 20 2 bob 90 21 @@ -47,6 +69,24 @@ 3 charlie 80 22 4 diana \N \N +-- !select_initial16 -- +1 alice 100 20 +2 bob 90 21 + +-- !select_after_alter_flex_jsonpaths -- +1 alice_updated 150 20 +2 bob 90 21 +3 charlie 80 \N + +-- !select_initial18 -- +1 alice 100 20 +2 bob 90 21 + +-- !select_after_alter_flex_where -- +1 alice 100 20 +2 bob 95 21 +3 charlie 80 \N + -- !select_initial20 -- 1 alice 100 20 2 bob 90 21 diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy index 1d00f12adc5..8d8d08fb179 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy @@ -232,17 +232,18 @@ suite("test_routine_load_flexible_partial_update", "nonConcurrent") { exception "Flexible partial update only supports JSON format" } - // Test 4: Error case - jsonpaths not supported - def kafkaJsonTopic4 = "test_routine_load_flexible_partial_update_jsonpaths_error" - def tableName4 = "test_routine_load_flex_update_jsonpaths_error" - def job4 = "test_flex_partial_update_job_jsonpaths_error" + // Test 4: Success case - jsonpaths works with flexible partial update + def kafkaJsonTopic4 = "test_routine_load_flexible_partial_update_jsonpaths" + def tableName4 = "test_routine_load_flex_update_jsonpaths" + def job4 = "test_flex_partial_update_job_jsonpaths" sql """ DROP TABLE IF EXISTS ${tableName4} force;""" sql """ CREATE TABLE IF NOT EXISTS ${tableName4} ( `id` int NOT NULL, `name` varchar(65533) NULL, - `score` int NULL + `score` int NULL, + `age` int NULL ) ENGINE=OLAP UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 3 @@ -254,14 +255,25 @@ suite("test_routine_load_flexible_partial_update", "nonConcurrent") { ); """ - test { + // insert initial data + sql """ + INSERT INTO ${tableName4} VALUES + (1, 'alice', 100, 20), + (2, 'bob', 90, 21), + (3, 'charlie', 80, 22) + """ + + qt_select_initial4 "SELECT id, name, score, age FROM ${tableName4} ORDER BY id" + + try { + // create routine load with jsonpaths and flexible partial update sql """ CREATE ROUTINE LOAD ${job4} ON ${tableName4} PROPERTIES ( "max_batch_interval" = "10", "format" = "json", - "jsonpaths" = '["\\$.id", "\\$.name"]', + "jsonpaths" = '["\\$.id", "\\$.name", "\\$.score"]', "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" ) FROM KAFKA @@ -271,7 +283,30 @@ suite("test_routine_load_flexible_partial_update", "nonConcurrent") { "property.kafka_default_offsets" = "OFFSET_BEGINNING" ); """ - exception "Flexible partial update does not support jsonpaths" + + // send JSON data - jsonpaths extracts id, name, score (age is not in jsonpaths so remains unchanged) + def data4 = [ + '{"id": 1, "name": "alice_updated", "score": 150}', + '{"id": 2, "name": "bob_updated", "score": 95}', + '{"id": 4, "name": "diana", "score": 70}' + ] + + data4.each { line -> + logger.info("Sending to Kafka: ${line}") + def record = new ProducerRecord<>(kafkaJsonTopic4, null, line) + producer.send(record).get() + } + producer.flush() + + RoutineLoadTestUtils.waitForTaskFinish(runSql, job4, tableName4, 3) + + // verify flexible partial update with jsonpaths works + qt_select_after_flex_jsonpaths "SELECT id, name, score, age FROM ${tableName4} ORDER BY id" + } catch (Exception e) { + logger.error("Error during test: " + e.getMessage()) + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job4}" } // Test 5: Error case - fuzzy_parse not supported @@ -358,17 +393,18 @@ suite("test_routine_load_flexible_partial_update", "nonConcurrent") { exception "Flexible partial update does not support COLUMNS specification" } - // Test 7: Error case - WHERE clause not supported - def kafkaJsonTopic7 = "test_routine_load_flexible_partial_update_where_error" - def tableName7 = "test_routine_load_flex_update_where_error" - def job7 = "test_flex_partial_update_job_where_error" + // Test 7: Success case - WHERE clause works with flexible partial update + def kafkaJsonTopic7 = "test_routine_load_flexible_partial_update_where" + def tableName7 = "test_routine_load_flex_update_where" + def job7 = "test_flex_partial_update_job_where" sql """ DROP TABLE IF EXISTS ${tableName7} force;""" sql """ CREATE TABLE IF NOT EXISTS ${tableName7} ( `id` int NOT NULL, `name` varchar(65533) NULL, - `score` int NULL + `score` int NULL, + `age` int NULL ) ENGINE=OLAP UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 3 @@ -380,10 +416,21 @@ suite("test_routine_load_flexible_partial_update", "nonConcurrent") { ); """ - test { + // insert initial data + sql """ + INSERT INTO ${tableName7} VALUES + (1, 'alice', 100, 20), + (2, 'bob', 90, 21), + (3, 'charlie', 80, 22) + """ + + qt_select_initial7 "SELECT id, name, score, age FROM ${tableName7} ORDER BY id" + + try { + // create routine load with WHERE clause and flexible partial update sql """ CREATE ROUTINE LOAD ${job7} ON ${tableName7} - WHERE id > 0 + WHERE id > 1 PROPERTIES ( "max_batch_interval" = "10", @@ -397,7 +444,31 @@ suite("test_routine_load_flexible_partial_update", "nonConcurrent") { "property.kafka_default_offsets" = "OFFSET_BEGINNING" ); """ - exception "Flexible partial update does not support WHERE clause" + + // send JSON data - WHERE clause filters id > 1, so id=1 row should NOT be processed + def data7 = [ + '{"id": 1, "score": 999}', + '{"id": 2, "score": 95}', + '{"id": 3, "name": "chuck"}', + '{"id": 4, "name": "diana", "score": 70}' + ] + + data7.each { line -> + logger.info("Sending to Kafka: ${line}") + def record = new ProducerRecord<>(kafkaJsonTopic7, null, line) + producer.send(record).get() + } + producer.flush() + + RoutineLoadTestUtils.waitForTaskFinish(runSql, job7, tableName7, 3) + + // verify: id=1 should NOT be updated (filtered by WHERE), id=2,3,4 should be updated + qt_select_after_flex_where "SELECT id, name, score, age FROM ${tableName7} ORDER BY id" + } catch (Exception e) { + logger.error("Error during test: " + e.getMessage()) + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job7}" } // Test 8: Error case - table without skip_bitmap column @@ -868,17 +939,18 @@ suite("test_routine_load_flexible_partial_update", "nonConcurrent") { sql "STOP ROUTINE LOAD FOR ${job15}" } - // Test 16: ALTER to flex mode fails when using jsonpaths - def kafkaJsonTopic16 = "test_routine_load_alter_flex_jsonpaths_error" - def tableName16 = "test_routine_load_alter_flex_jsonpaths_error" - def job16 = "test_alter_flex_jsonpaths_error_job" + // Test 16: ALTER to flex mode succeeds with jsonpaths + def kafkaJsonTopic16 = "test_routine_load_alter_flex_jsonpaths" + def tableName16 = "test_routine_load_alter_flex_jsonpaths" + def job16 = "test_alter_flex_jsonpaths_job" sql """ DROP TABLE IF EXISTS ${tableName16} force;""" sql """ CREATE TABLE IF NOT EXISTS ${tableName16} ( `id` int NOT NULL, `name` varchar(65533) NULL, - `score` int NULL + `score` int NULL, + `age` int NULL ) ENGINE=OLAP UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 3 @@ -890,8 +962,17 @@ suite("test_routine_load_flexible_partial_update", "nonConcurrent") { ); """ + // insert initial data + sql """ + INSERT INTO ${tableName16} VALUES + (1, 'alice', 100, 20), + (2, 'bob', 90, 21) + """ + + qt_select_initial16 "SELECT id, name, score, age FROM ${tableName16} ORDER BY id" + try { - // create routine load with jsonpaths + // create routine load with jsonpaths (UPSERT mode) sql """ CREATE ROUTINE LOAD ${job16} ON ${tableName16} PROPERTIES @@ -910,17 +991,40 @@ suite("test_routine_load_flexible_partial_update", "nonConcurrent") { sql "PAUSE ROUTINE LOAD FOR ${job16}" - // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail because jsonpaths - test { - sql """ - ALTER ROUTINE LOAD FOR ${job16} - PROPERTIES - ( - "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" - ); - """ - exception "Flexible partial update does not support jsonpaths" + // alter to UPDATE_FLEXIBLE_COLUMNS mode - should succeed + sql """ + ALTER ROUTINE LOAD FOR ${job16} + PROPERTIES + ( + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ); + """ + + // verify the property was changed + def res = sql "SHOW ROUTINE LOAD FOR ${job16}" + def jobProperties = res[0][11].toString() + logger.info("Altered routine load job properties: ${jobProperties}") + assertTrue(jobProperties.contains("UPDATE_FLEXIBLE_COLUMNS")) + + sql "RESUME ROUTINE LOAD FOR ${job16}" + + // send JSON data + def data16 = [ + '{"id": 1, "name": "alice_updated", "score": 150}', + '{"id": 3, "name": "charlie", "score": 80}' + ] + + data16.each { line -> + logger.info("Sending to Kafka: ${line}") + def record = new ProducerRecord<>(kafkaJsonTopic16, null, line) + producer.send(record).get() } + producer.flush() + + RoutineLoadTestUtils.waitForTaskFinish(runSql, job16, tableName16, 2) + + // verify flexible partial update with jsonpaths after ALTER + qt_select_after_alter_flex_jsonpaths "SELECT id, name, score, age FROM ${tableName16} ORDER BY id" } catch (Exception e) { logger.error("Error during test: " + e.getMessage()) throw e @@ -988,17 +1092,18 @@ suite("test_routine_load_flexible_partial_update", "nonConcurrent") { sql "STOP ROUTINE LOAD FOR ${job17}" } - // Test 18: ALTER to flex mode fails when WHERE clause is specified - def kafkaJsonTopic18 = "test_routine_load_alter_flex_where_error" - def tableName18 = "test_routine_load_alter_flex_where_error" - def job18 = "test_alter_flex_where_error_job" + // Test 18: ALTER to flex mode succeeds with WHERE clause + def kafkaJsonTopic18 = "test_routine_load_alter_flex_where" + def tableName18 = "test_routine_load_alter_flex_where" + def job18 = "test_alter_flex_where_job" sql """ DROP TABLE IF EXISTS ${tableName18} force;""" sql """ CREATE TABLE IF NOT EXISTS ${tableName18} ( `id` int NOT NULL, `name` varchar(65533) NULL, - `score` int NULL + `score` int NULL, + `age` int NULL ) ENGINE=OLAP UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 3 @@ -1010,11 +1115,20 @@ suite("test_routine_load_flexible_partial_update", "nonConcurrent") { ); """ + // insert initial data + sql """ + INSERT INTO ${tableName18} VALUES + (1, 'alice', 100, 20), + (2, 'bob', 90, 21) + """ + + qt_select_initial18 "SELECT id, name, score, age FROM ${tableName18} ORDER BY id" + try { - // create routine load with WHERE clause + // create routine load with WHERE clause (UPSERT mode) sql """ CREATE ROUTINE LOAD ${job18} ON ${tableName18} - WHERE id > 0 + WHERE id > 1 PROPERTIES ( "max_batch_interval" = "10", @@ -1030,17 +1144,41 @@ suite("test_routine_load_flexible_partial_update", "nonConcurrent") { sql "PAUSE ROUTINE LOAD FOR ${job18}" - // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail because WHERE clause - test { - sql """ - ALTER ROUTINE LOAD FOR ${job18} - PROPERTIES - ( - "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" - ); - """ - exception "Flexible partial update does not support WHERE clause" + // alter to UPDATE_FLEXIBLE_COLUMNS mode - should succeed + sql """ + ALTER ROUTINE LOAD FOR ${job18} + PROPERTIES + ( + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ); + """ + + // verify the property was changed + def res = sql "SHOW ROUTINE LOAD FOR ${job18}" + def jobProperties = res[0][11].toString() + logger.info("Altered routine load job properties: ${jobProperties}") + assertTrue(jobProperties.contains("UPDATE_FLEXIBLE_COLUMNS")) + + sql "RESUME ROUTINE LOAD FOR ${job18}" + + // send JSON data - WHERE clause filters id > 1 + def data18 = [ + '{"id": 1, "score": 999}', + '{"id": 2, "score": 95}', + '{"id": 3, "name": "charlie", "score": 80}' + ] + + data18.each { line -> + logger.info("Sending to Kafka: ${line}") + def record = new ProducerRecord<>(kafkaJsonTopic18, null, line) + producer.send(record).get() } + producer.flush() + + RoutineLoadTestUtils.waitForTaskFinish(runSql, job18, tableName18, 2) + + // verify: id=1 should NOT be updated (filtered by WHERE), id=2,3 should be updated + qt_select_after_alter_flex_where "SELECT id, name, score, age FROM ${tableName18} ORDER BY id" } catch (Exception e) { logger.error("Error during test: " + e.getMessage()) throw e --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
