This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch routineload_flexible_update
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d25ff38578788e474b9f4f23eb67a59eff250446
Author: Yongqiang YANG <[email protected]>
AuthorDate: Wed Jan 14 16:14:15 2026 -0800

    fix: Resolve code review issues for flexible partial update
    
    1. Fix exception type mismatch in KafkaRoutineLoadJob.replayModifyProperties
       - Changed catch block from DdlException to UserException since
         modifyPropertiesInternal now throws UserException
    
    2. Fix setSchemaForPartialUpdate not called for flexible partial update
       - Changed condition from isPartialUpdate to check both
         UPDATE_FIXED_COLUMNS and UPDATE_FLEXIBLE_COLUMNS modes
       - Aligns with StreamLoadHandler behavior
    
    3. Update tests to allow WHERE clause and jsonpaths with flexible partial 
update
       - Tests 4, 7, 16, 18 now verify these features work correctly
       - Added expected output for new success test cases
---
 .../load/routineload/KafkaRoutineLoadJob.java      |   2 +-
 .../doris/load/routineload/RoutineLoadJob.java     |   3 +-
 .../test_routine_load_flexible_partial_update.out  |  40 ++++
 ...est_routine_load_flexible_partial_update.groovy | 236 ++++++++++++++++-----
 4 files changed, 230 insertions(+), 51 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index ebb3ed0f7fb..7cd2a71d2f2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -830,7 +830,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) {
         try {
             modifyPropertiesInternal(log.getJobProperties(), 
(KafkaDataSourceProperties) log.getDataSourceProperties());
-        } catch (DdlException e) {
+        } catch (UserException e) {
             // should not happen
             LOG.error("failed to replay modify kafka routine load job: {}", 
id, e);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 4554a2c2066..2dfad5c52cd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1068,7 +1068,8 @@ public abstract class RoutineLoadJob
                 throw new UserException("txn does not exist: " + txnId);
             }
             txnState.addTableIndexes(planner.getDestTable());
-            if (isPartialUpdate) {
+            if (uniqueKeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS
+                    || uniqueKeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) {
                 txnState.setSchemaForPartialUpdate((OlapTable) table);
             }
 
diff --git 
a/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out
 
b/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out
index 51176ff3f19..fdcd03a5560 100644
--- 
a/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out
+++ 
b/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out
@@ -25,6 +25,28 @@
 3      1000    2000    3000    4000
 4      \N      9876    4444    1234
 
+-- !select_initial4 --
+1      alice   100     20
+2      bob     90      21
+3      charlie 80      22
+
+-- !select_after_flex_jsonpaths --
+1      alice_updated   150     20
+2      bob_updated     95      21
+3      charlie 80      22
+4      diana   70      \N
+
+-- !select_initial7 --
+1      alice   100     20
+2      bob     90      21
+3      charlie 80      22
+
+-- !select_after_flex_where --
+1      alice   100     20
+2      bob     95      21
+3      chuck   80      22
+4      diana   70      \N
+
 -- !select_initial11 --
 1      alice   100     20
 2      bob     90      21
@@ -47,6 +69,24 @@
 3      charlie 80      22
 4      diana   \N      \N
 
+-- !select_initial16 --
+1      alice   100     20
+2      bob     90      21
+
+-- !select_after_alter_flex_jsonpaths --
+1      alice_updated   150     20
+2      bob     90      21
+3      charlie 80      \N
+
+-- !select_initial18 --
+1      alice   100     20
+2      bob     90      21
+
+-- !select_after_alter_flex_where --
+1      alice   100     20
+2      bob     95      21
+3      charlie 80      \N
+
 -- !select_initial20 --
 1      alice   100     20
 2      bob     90      21
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
index 1d00f12adc5..8d8d08fb179 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
@@ -232,17 +232,18 @@ suite("test_routine_load_flexible_partial_update", 
"nonConcurrent") {
             exception "Flexible partial update only supports JSON format"
         }
 
-        // Test 4: Error case - jsonpaths not supported
-        def kafkaJsonTopic4 = 
"test_routine_load_flexible_partial_update_jsonpaths_error"
-        def tableName4 = "test_routine_load_flex_update_jsonpaths_error"
-        def job4 = "test_flex_partial_update_job_jsonpaths_error"
+        // Test 4: Success case - jsonpaths works with flexible partial update
+        def kafkaJsonTopic4 = 
"test_routine_load_flexible_partial_update_jsonpaths"
+        def tableName4 = "test_routine_load_flex_update_jsonpaths"
+        def job4 = "test_flex_partial_update_job_jsonpaths"
 
         sql """ DROP TABLE IF EXISTS ${tableName4} force;"""
         sql """
             CREATE TABLE IF NOT EXISTS ${tableName4} (
                 `id` int NOT NULL,
                 `name` varchar(65533) NULL,
-                `score` int NULL
+                `score` int NULL,
+                `age` int NULL
             ) ENGINE=OLAP
             UNIQUE KEY(`id`)
             DISTRIBUTED BY HASH(`id`) BUCKETS 3
@@ -254,14 +255,25 @@ suite("test_routine_load_flexible_partial_update", 
"nonConcurrent") {
             );
         """
 
-        test {
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName4} VALUES
+            (1, 'alice', 100, 20),
+            (2, 'bob', 90, 21),
+            (3, 'charlie', 80, 22)
+        """
+
+        qt_select_initial4 "SELECT id, name, score, age FROM ${tableName4} 
ORDER BY id"
+
+        try {
+            // create routine load with jsonpaths and flexible partial update
             sql """
                 CREATE ROUTINE LOAD ${job4} ON ${tableName4}
                 PROPERTIES
                 (
                     "max_batch_interval" = "10",
                     "format" = "json",
-                    "jsonpaths" = '["\\$.id", "\\$.name"]',
+                    "jsonpaths" = '["\\$.id", "\\$.name", "\\$.score"]',
                     "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
                 )
                 FROM KAFKA
@@ -271,7 +283,30 @@ suite("test_routine_load_flexible_partial_update", 
"nonConcurrent") {
                     "property.kafka_default_offsets" = "OFFSET_BEGINNING"
                 );
             """
-            exception "Flexible partial update does not support jsonpaths"
+
+            // send JSON data - jsonpaths extracts id, name, score (age is not 
in jsonpaths so remains unchanged)
+            def data4 = [
+                '{"id": 1, "name": "alice_updated", "score": 150}',
+                '{"id": 2, "name": "bob_updated", "score": 95}',
+                '{"id": 4, "name": "diana", "score": 70}'
+            ]
+
+            data4.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaJsonTopic4, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, job4, tableName4, 3)
+
+            // verify flexible partial update with jsonpaths works
+            qt_select_after_flex_jsonpaths "SELECT id, name, score, age FROM 
${tableName4} ORDER BY id"
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job4}"
         }
 
         // Test 5: Error case - fuzzy_parse not supported
@@ -358,17 +393,18 @@ suite("test_routine_load_flexible_partial_update", 
"nonConcurrent") {
             exception "Flexible partial update does not support COLUMNS 
specification"
         }
 
-        // Test 7: Error case - WHERE clause not supported
-        def kafkaJsonTopic7 = 
"test_routine_load_flexible_partial_update_where_error"
-        def tableName7 = "test_routine_load_flex_update_where_error"
-        def job7 = "test_flex_partial_update_job_where_error"
+        // Test 7: Success case - WHERE clause works with flexible partial 
update
+        def kafkaJsonTopic7 = "test_routine_load_flexible_partial_update_where"
+        def tableName7 = "test_routine_load_flex_update_where"
+        def job7 = "test_flex_partial_update_job_where"
 
         sql """ DROP TABLE IF EXISTS ${tableName7} force;"""
         sql """
             CREATE TABLE IF NOT EXISTS ${tableName7} (
                 `id` int NOT NULL,
                 `name` varchar(65533) NULL,
-                `score` int NULL
+                `score` int NULL,
+                `age` int NULL
             ) ENGINE=OLAP
             UNIQUE KEY(`id`)
             DISTRIBUTED BY HASH(`id`) BUCKETS 3
@@ -380,10 +416,21 @@ suite("test_routine_load_flexible_partial_update", 
"nonConcurrent") {
             );
         """
 
-        test {
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName7} VALUES
+            (1, 'alice', 100, 20),
+            (2, 'bob', 90, 21),
+            (3, 'charlie', 80, 22)
+        """
+
+        qt_select_initial7 "SELECT id, name, score, age FROM ${tableName7} 
ORDER BY id"
+
+        try {
+            // create routine load with WHERE clause and flexible partial 
update
             sql """
                 CREATE ROUTINE LOAD ${job7} ON ${tableName7}
-                WHERE id > 0
+                WHERE id > 1
                 PROPERTIES
                 (
                     "max_batch_interval" = "10",
@@ -397,7 +444,31 @@ suite("test_routine_load_flexible_partial_update", 
"nonConcurrent") {
                     "property.kafka_default_offsets" = "OFFSET_BEGINNING"
                 );
             """
-            exception "Flexible partial update does not support WHERE clause"
+
+            // send JSON data - WHERE clause filters id > 1, so id=1 row 
should NOT be processed
+            def data7 = [
+                '{"id": 1, "score": 999}',
+                '{"id": 2, "score": 95}',
+                '{"id": 3, "name": "chuck"}',
+                '{"id": 4, "name": "diana", "score": 70}'
+            ]
+
+            data7.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaJsonTopic7, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, job7, tableName7, 3)
+
+            // verify: id=1 should NOT be updated (filtered by WHERE), 
id=2,3,4 should be updated
+            qt_select_after_flex_where "SELECT id, name, score, age FROM 
${tableName7} ORDER BY id"
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job7}"
         }
 
         // Test 8: Error case - table without skip_bitmap column
@@ -868,17 +939,18 @@ suite("test_routine_load_flexible_partial_update", 
"nonConcurrent") {
             sql "STOP ROUTINE LOAD FOR ${job15}"
         }
 
-        // Test 16: ALTER to flex mode fails when using jsonpaths
-        def kafkaJsonTopic16 = "test_routine_load_alter_flex_jsonpaths_error"
-        def tableName16 = "test_routine_load_alter_flex_jsonpaths_error"
-        def job16 = "test_alter_flex_jsonpaths_error_job"
+        // Test 16: ALTER to flex mode succeeds with jsonpaths
+        def kafkaJsonTopic16 = "test_routine_load_alter_flex_jsonpaths"
+        def tableName16 = "test_routine_load_alter_flex_jsonpaths"
+        def job16 = "test_alter_flex_jsonpaths_job"
 
         sql """ DROP TABLE IF EXISTS ${tableName16} force;"""
         sql """
             CREATE TABLE IF NOT EXISTS ${tableName16} (
                 `id` int NOT NULL,
                 `name` varchar(65533) NULL,
-                `score` int NULL
+                `score` int NULL,
+                `age` int NULL
             ) ENGINE=OLAP
             UNIQUE KEY(`id`)
             DISTRIBUTED BY HASH(`id`) BUCKETS 3
@@ -890,8 +962,17 @@ suite("test_routine_load_flexible_partial_update", 
"nonConcurrent") {
             );
         """
 
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName16} VALUES
+            (1, 'alice', 100, 20),
+            (2, 'bob', 90, 21)
+        """
+
+        qt_select_initial16 "SELECT id, name, score, age FROM ${tableName16} 
ORDER BY id"
+
         try {
-            // create routine load with jsonpaths
+            // create routine load with jsonpaths (UPSERT mode)
             sql """
                 CREATE ROUTINE LOAD ${job16} ON ${tableName16}
                 PROPERTIES
@@ -910,17 +991,40 @@ suite("test_routine_load_flexible_partial_update", 
"nonConcurrent") {
 
             sql "PAUSE ROUTINE LOAD FOR ${job16}"
 
-            // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail 
because jsonpaths
-            test {
-                sql """
-                    ALTER ROUTINE LOAD FOR ${job16}
-                    PROPERTIES
-                    (
-                        "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
-                    );
-                """
-                exception "Flexible partial update does not support jsonpaths"
+            // alter to UPDATE_FLEXIBLE_COLUMNS mode - should succeed
+            sql """
+                ALTER ROUTINE LOAD FOR ${job16}
+                PROPERTIES
+                (
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                );
+            """
+
+            // verify the property was changed
+            def res = sql "SHOW ROUTINE LOAD FOR ${job16}"
+            def jobProperties = res[0][11].toString()
+            logger.info("Altered routine load job properties: 
${jobProperties}")
+            assertTrue(jobProperties.contains("UPDATE_FLEXIBLE_COLUMNS"))
+
+            sql "RESUME ROUTINE LOAD FOR ${job16}"
+
+            // send JSON data
+            def data16 = [
+                '{"id": 1, "name": "alice_updated", "score": 150}',
+                '{"id": 3, "name": "charlie", "score": 80}'
+            ]
+
+            data16.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaJsonTopic16, null, line)
+                producer.send(record).get()
             }
+            producer.flush()
+
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, job16, tableName16, 
2)
+
+            // verify flexible partial update with jsonpaths after ALTER
+            qt_select_after_alter_flex_jsonpaths "SELECT id, name, score, age 
FROM ${tableName16} ORDER BY id"
         } catch (Exception e) {
             logger.error("Error during test: " + e.getMessage())
             throw e
@@ -988,17 +1092,18 @@ suite("test_routine_load_flexible_partial_update", 
"nonConcurrent") {
             sql "STOP ROUTINE LOAD FOR ${job17}"
         }
 
-        // Test 18: ALTER to flex mode fails when WHERE clause is specified
-        def kafkaJsonTopic18 = "test_routine_load_alter_flex_where_error"
-        def tableName18 = "test_routine_load_alter_flex_where_error"
-        def job18 = "test_alter_flex_where_error_job"
+        // Test 18: ALTER to flex mode succeeds with WHERE clause
+        def kafkaJsonTopic18 = "test_routine_load_alter_flex_where"
+        def tableName18 = "test_routine_load_alter_flex_where"
+        def job18 = "test_alter_flex_where_job"
 
         sql """ DROP TABLE IF EXISTS ${tableName18} force;"""
         sql """
             CREATE TABLE IF NOT EXISTS ${tableName18} (
                 `id` int NOT NULL,
                 `name` varchar(65533) NULL,
-                `score` int NULL
+                `score` int NULL,
+                `age` int NULL
             ) ENGINE=OLAP
             UNIQUE KEY(`id`)
             DISTRIBUTED BY HASH(`id`) BUCKETS 3
@@ -1010,11 +1115,20 @@ suite("test_routine_load_flexible_partial_update", 
"nonConcurrent") {
             );
         """
 
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName18} VALUES
+            (1, 'alice', 100, 20),
+            (2, 'bob', 90, 21)
+        """
+
+        qt_select_initial18 "SELECT id, name, score, age FROM ${tableName18} 
ORDER BY id"
+
         try {
-            // create routine load with WHERE clause
+            // create routine load with WHERE clause (UPSERT mode)
             sql """
                 CREATE ROUTINE LOAD ${job18} ON ${tableName18}
-                WHERE id > 0
+                WHERE id > 1
                 PROPERTIES
                 (
                     "max_batch_interval" = "10",
@@ -1030,17 +1144,41 @@ suite("test_routine_load_flexible_partial_update", 
"nonConcurrent") {
 
             sql "PAUSE ROUTINE LOAD FOR ${job18}"
 
-            // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail 
because WHERE clause
-            test {
-                sql """
-                    ALTER ROUTINE LOAD FOR ${job18}
-                    PROPERTIES
-                    (
-                        "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
-                    );
-                """
-                exception "Flexible partial update does not support WHERE 
clause"
+            // alter to UPDATE_FLEXIBLE_COLUMNS mode - should succeed
+            sql """
+                ALTER ROUTINE LOAD FOR ${job18}
+                PROPERTIES
+                (
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                );
+            """
+
+            // verify the property was changed
+            def res = sql "SHOW ROUTINE LOAD FOR ${job18}"
+            def jobProperties = res[0][11].toString()
+            logger.info("Altered routine load job properties: 
${jobProperties}")
+            assertTrue(jobProperties.contains("UPDATE_FLEXIBLE_COLUMNS"))
+
+            sql "RESUME ROUTINE LOAD FOR ${job18}"
+
+            // send JSON data - WHERE clause filters id > 1
+            def data18 = [
+                '{"id": 1, "score": 999}',
+                '{"id": 2, "score": 95}',
+                '{"id": 3, "name": "charlie", "score": 80}'
+            ]
+
+            data18.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaJsonTopic18, null, line)
+                producer.send(record).get()
             }
+            producer.flush()
+
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, job18, tableName18, 
2)
+
+            // verify: id=1 should NOT be updated (filtered by WHERE), id=2,3 
should be updated
+            qt_select_after_alter_flex_where "SELECT id, name, score, age FROM 
${tableName18} ORDER BY id"
         } catch (Exception e) {
             logger.error("Error during test: " + e.getMessage())
             throw e


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to