Copilot commented on code in PR #59896:
URL: https://github.com/apache/doris/pull/59896#discussion_r2692464307


##########
fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java:
##########
@@ -1932,9 +1941,27 @@ public void gsonPostProcess() throws IOException {
         if (tableId == 0) {
             isMultiTable = true;
         }
+        // Process UNIQUE_KEY_UPDATE_MODE first to ensure correct backward 
compatibility
+        // with PARTIAL_COLUMNS (HashMap iteration order is not guaranteed)
+        if 
(jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) {
+            String modeValue = 
jobProperties.get(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE);
+            try {
+                uniqueKeyUpdateMode = 
TUniqueKeyUpdateMode.valueOf(modeValue.toUpperCase());
+                isPartialUpdate = (uniqueKeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS);
+            } catch (IllegalArgumentException e) {
+                uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT;
+            }
+        }
+        // Process remaining properties
         jobProperties.forEach((k, v) -> {
             if (k.equals(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
-                isPartialUpdate = Boolean.parseBoolean(v);
+                // Backward compatibility: if unique_key_update_mode is not 
set, use partial_columns
+                if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) {
+                    isPartialUpdate = Boolean.parseBoolean(v);
+                    if (isPartialUpdate) {
+                        uniqueKeyUpdateMode = 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+                    }

Review Comment:
   The gsonPostProcess logic doesn't handle the case where PARTIAL_COLUMNS is 
'false' but uniqueKeyUpdateMode was previously UPDATE_FIXED_COLUMNS. When 
deserializing old metadata, if partial_columns=false is processed, it should 
reset uniqueKeyUpdateMode to UPSERT for consistency. Add an else branch to 
handle this case.
   ```suggestion
                   boolean partialColumns = Boolean.parseBoolean(v);
                   // Backward compatibility: if unique_key_update_mode is not 
set, use partial_columns
                   if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) {
                       isPartialUpdate = partialColumns;
                       if (isPartialUpdate) {
                           uniqueKeyUpdateMode = 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
                       }
                   } else if (!partialColumns
                           && uniqueKeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) {
                       // Backward compatibility: if partial_columns is false 
but unique_key_update_mode
                       // was UPDATE_FIXED_COLUMNS, reset to UPSERT for 
consistency
                       isPartialUpdate = false;
                       uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT;
   ```



##########
regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy:
##########
@@ -0,0 +1,1276 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+
+suite("test_routine_load_flexible_partial_update", "nonConcurrent") {
+
+    if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+        def runSql = { String q -> sql q }
+        def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+        def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+        // Test 1: Basic flexible partial update
+        def kafkaJsonTopic1 = "test_routine_load_flexible_partial_update_basic"
+        def tableName1 = "test_routine_load_flex_update_basic"
+        def job1 = "test_flex_partial_update_job_basic"
+
+        sql """ DROP TABLE IF EXISTS ${tableName1} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName1} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL,
+                `age` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            COMMENT 'test flexible partial update'
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        // verify skip bitmap column is enabled
+        def show_res = sql "show create table ${tableName1}"
+        
assertTrue(show_res.toString().contains('"enable_unique_key_skip_bitmap_column" 
= "true"'))
+
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName1} VALUES
+            (1, 'alice', 100, 20),
+            (2, 'bob', 90, 21),
+            (3, 'charlie', 80, 22),
+            (4, 'david', 70, 23),
+            (5, 'eve', 60, 24)
+        """
+
+        qt_select_initial1 "SELECT id, name, score, age FROM ${tableName1} 
ORDER BY id"
+
+        try {
+            // create routine load with flexible partial update
+            sql """
+                CREATE ROUTINE LOAD ${job1} ON ${tableName1}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic1}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            // send JSON data with different columns per row
+            // Row 1: update only score for id=1
+            // Row 2: update only age for id=2
+            // Row 3: update both name and score for id=3
+            // Row 4: insert new row with only id and name
+            def data = [
+                '{"id": 1, "score": 150}',
+                '{"id": 2, "age": 30}',
+                '{"id": 3, "name": "chuck", "score": 95}',
+                '{"id": 6, "name": "frank"}'
+            ]
+
+            data.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaJsonTopic1, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            // wait for routine load task to finish
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, job1, tableName1, 4)
+
+            // verify flexible partial update results
+            qt_select_after_flex_update1 "SELECT id, name, score, age FROM 
${tableName1} ORDER BY id"
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job1}"
+        }
+
+        // Test 2: Flexible partial update with default values
+        def kafkaJsonTopic2 = 
"test_routine_load_flexible_partial_update_default"
+        def tableName2 = "test_routine_load_flex_update_default"
+        def job2 = "test_flex_partial_update_job_default"
+
+        sql """ DROP TABLE IF EXISTS ${tableName2} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName2} (
+                `id` int NOT NULL,
+                `v1` bigint NULL,
+                `v2` bigint NULL DEFAULT "9876",
+                `v3` bigint NOT NULL,
+                `v4` bigint NOT NULL DEFAULT "1234"
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            COMMENT 'test flexible partial update with default values'
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName2} VALUES
+            (1, 10, 20, 30, 40),
+            (2, 100, 200, 300, 400),
+            (3, 1000, 2000, 3000, 4000)
+        """
+
+        qt_select_initial2 "SELECT id, v1, v2, v3, v4 FROM ${tableName2} ORDER 
BY id"
+
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${job2} ON ${tableName2}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic2}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            // send JSON data with different columns per row
+            def data2 = [
+                '{"id": 1, "v1": 11}',
+                '{"id": 2, "v2": 222, "v3": 333}',
+                '{"id": 4, "v3": 4444}'
+            ]
+
+            data2.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaJsonTopic2, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, job2, tableName2, 3)
+
+            qt_select_after_flex_update2 "SELECT id, v1, v2, v3, v4 FROM 
${tableName2} ORDER BY id"
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job2}"
+        }
+
+        // Test 3: Error case - CSV format not supported
+        def kafkaCsvTopic3 = 
"test_routine_load_flexible_partial_update_csv_error"
+        def tableName3 = "test_routine_load_flex_update_csv_error"
+        def job3 = "test_flex_partial_update_job_csv_error"
+
+        sql """ DROP TABLE IF EXISTS ${tableName3} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName3} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        test {
+            sql """
+                CREATE ROUTINE LOAD ${job3} ON ${tableName3}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaCsvTopic3}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            exception "Flexible partial update only supports JSON format"
+        }
+
+        // Test 4: Error case - jsonpaths not supported
+        def kafkaJsonTopic4 = 
"test_routine_load_flexible_partial_update_jsonpaths_error"
+        def tableName4 = "test_routine_load_flex_update_jsonpaths_error"
+        def job4 = "test_flex_partial_update_job_jsonpaths_error"
+
+        sql """ DROP TABLE IF EXISTS ${tableName4} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName4} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        test {
+            sql """
+                CREATE ROUTINE LOAD ${job4} ON ${tableName4}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "jsonpaths" = '["\\$.id", "\\$.name"]',
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic4}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            exception "Flexible partial update does not support jsonpaths"
+        }
+
+        // Test 5: Error case - fuzzy_parse not supported
+        def kafkaJsonTopic5 = 
"test_routine_load_flexible_partial_update_fuzzy_error"
+        def tableName5 = "test_routine_load_flex_update_fuzzy_error"
+        def job5 = "test_flex_partial_update_job_fuzzy_error"
+
+        sql """ DROP TABLE IF EXISTS ${tableName5} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName5} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        test {
+            sql """
+                CREATE ROUTINE LOAD ${job5} ON ${tableName5}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "fuzzy_parse" = "true",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic5}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            exception "Flexible partial update does not support fuzzy_parse"
+        }
+
+        // Test 6: Error case - COLUMNS clause not supported
+        def kafkaJsonTopic6 = 
"test_routine_load_flexible_partial_update_columns_error"
+        def tableName6 = "test_routine_load_flex_update_columns_error"
+        def job6 = "test_flex_partial_update_job_columns_error"
+
+        sql """ DROP TABLE IF EXISTS ${tableName6} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName6} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        test {
+            sql """
+                CREATE ROUTINE LOAD ${job6} ON ${tableName6}
+                COLUMNS (id, name, score)
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic6}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            exception "Flexible partial update does not support COLUMNS 
specification"
+        }
+
+        // Test 7: Error case - WHERE clause not supported
+        def kafkaJsonTopic7 = 
"test_routine_load_flexible_partial_update_where_error"
+        def tableName7 = "test_routine_load_flex_update_where_error"
+        def job7 = "test_flex_partial_update_job_where_error"
+
+        sql """ DROP TABLE IF EXISTS ${tableName7} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName7} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        test {
+            sql """
+                CREATE ROUTINE LOAD ${job7} ON ${tableName7}
+                WHERE id > 0
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic7}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            exception "Flexible partial update does not support WHERE clause"
+        }
+
+        // Test 8: Error case - table without skip_bitmap column
+        def kafkaJsonTopic8 = 
"test_routine_load_flexible_partial_update_no_skip_bitmap"
+        def tableName8 = "test_routine_load_flex_update_no_skip_bitmap"
+        def job8 = "test_flex_partial_update_job_no_skip_bitmap"
+
+        sql """ DROP TABLE IF EXISTS ${tableName8} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName8} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "enable_unique_key_skip_bitmap_column" = "false"
+            );
+        """
+
+        test {
+            sql """
+                CREATE ROUTINE LOAD ${job8} ON ${tableName8}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic8}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            exception "Flexible partial update requires table with skip bitmap 
hidden column"
+        }
+
+        // Test 9: Error case - table with variant column
+        def kafkaJsonTopic9 = 
"test_routine_load_flexible_partial_update_variant"
+        def tableName9 = "test_routine_load_flex_update_variant"
+        def job9 = "test_flex_partial_update_job_variant"
+
+        sql """ DROP TABLE IF EXISTS ${tableName9} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName9} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `data` variant NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        test {
+            sql """
+                CREATE ROUTINE LOAD ${job9} ON ${tableName9}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic9}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            exception "Flexible partial update does not support tables with 
variant columns"
+        }
+
+        // Test 10: Error case - invalid unique_key_update_mode value
+        def kafkaJsonTopic10 = 
"test_routine_load_flexible_partial_update_invalid_mode"
+        def tableName10 = "test_routine_load_flex_update_invalid_mode"
+        def job10 = "test_flex_partial_update_job_invalid_mode"
+
+        sql """ DROP TABLE IF EXISTS ${tableName10} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName10} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        test {
+            sql """
+                CREATE ROUTINE LOAD ${job10} ON ${tableName10}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "INVALID_MODE"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic10}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            exception "unique_key_update_mode should be one of"
+        }
+
+        // Test 11: UPDATE_FIXED_COLUMNS mode (backward compatibility)
+        def kafkaJsonTopic11 = "test_routine_load_fixed_columns_mode"
+        def tableName11 = "test_routine_load_fixed_columns_mode"
+        def job11 = "test_fixed_columns_mode_job"
+
+        sql """ DROP TABLE IF EXISTS ${tableName11} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName11} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL,
+                `age` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true"
+            );
+        """
+
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName11} VALUES
+            (1, 'alice', 100, 20),
+            (2, 'bob', 90, 21),
+            (3, 'charlie', 80, 22)
+        """
+
+        qt_select_initial11 "SELECT id, name, score, age FROM ${tableName11} 
ORDER BY id"
+
+        try {
+            // create routine load with UPDATE_FIXED_COLUMNS mode
+            sql """
+                CREATE ROUTINE LOAD ${job11} ON ${tableName11}
+                COLUMNS (id, score)
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "UPDATE_FIXED_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic11}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            def data11 = [
+                '{"id": 1, "score": 150}',
+                '{"id": 2, "score": 95}',
+                '{"id": 4, "score": 85}'
+            ]
+
+            data11.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaJsonTopic11, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, job11, tableName11, 
3)
+
+            qt_select_after_fixed_update "SELECT id, name, score, age FROM 
${tableName11} ORDER BY id"
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job11}"
+        }
+
+        // Test 12: ALTER ROUTINE LOAD to change unique_key_update_mode
+        def kafkaJsonTopic12 = "test_routine_load_alter_flex_mode"
+        def tableName12 = "test_routine_load_alter_flex_mode"
+        def job12 = "test_alter_flex_mode_job"
+
+        sql """ DROP TABLE IF EXISTS ${tableName12} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName12} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL,
+                `age` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName12} VALUES
+            (1, 'alice', 100, 20),
+            (2, 'bob', 90, 21),
+            (3, 'charlie', 80, 22)
+        """
+
+        qt_select_initial12 "SELECT id, name, score, age FROM ${tableName12} 
ORDER BY id"
+
+        try {
+            // create routine load with UPSERT mode (default)
+            sql """
+                CREATE ROUTINE LOAD ${job12} ON ${tableName12}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic12}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            // pause the job before altering
+            sql "PAUSE ROUTINE LOAD FOR ${job12}"
+
+            // alter to UPDATE_FLEXIBLE_COLUMNS mode
+            sql """
+                ALTER ROUTINE LOAD FOR ${job12}
+                PROPERTIES
+                (
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                );
+            """
+
+            // verify the property was changed
+            def res = sql "SHOW ROUTINE LOAD FOR ${job12}"
+            def jobProperties = res[0][11].toString()
+            logger.info("Altered routine load job properties: 
${jobProperties}")
+            assertTrue(jobProperties.contains("UPDATE_FLEXIBLE_COLUMNS"))
+
+            // resume the job
+            sql "RESUME ROUTINE LOAD FOR ${job12}"
+
+            // send JSON data with different columns per row
+            def data12 = [
+                '{"id": 1, "score": 200}',
+                '{"id": 2, "age": 35}',
+                '{"id": 4, "name": "diana"}'
+            ]
+
+            data12.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaJsonTopic12, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, job12, tableName12, 
3)
+
+            // verify flexible partial update results after alter
+            qt_select_after_alter_flex "SELECT id, name, score, age FROM 
${tableName12} ORDER BY id"
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job12}"
+        }
+
+        // Test 13: ALTER ROUTINE LOAD - error when trying to change to flex 
mode with invalid settings
+        def kafkaJsonTopic13 = "test_routine_load_alter_flex_error"
+        def tableName13 = "test_routine_load_alter_flex_error"
+        def job13 = "test_alter_flex_error_job"
+
+        sql """ DROP TABLE IF EXISTS ${tableName13} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName13} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "enable_unique_key_skip_bitmap_column" = "false"
+            );
+        """
+
+        try {
+            // create routine load with UPSERT mode (default)
+            sql """
+                CREATE ROUTINE LOAD ${job13} ON ${tableName13}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic13}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            // pause the job before altering
+            sql "PAUSE ROUTINE LOAD FOR ${job13}"
+
+            // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail 
because table doesn't have skip_bitmap
+            test {
+                sql """
+                    ALTER ROUTINE LOAD FOR ${job13}
+                    PROPERTIES
+                    (
+                        "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                    );
+                """
+                exception "Flexible partial update requires table with skip 
bitmap hidden column"
+            }
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job13}"
+        }
+
+        // Test 14: ALTER to flex mode fails when using CSV format
+        def kafkaJsonTopic14 = "test_routine_load_alter_flex_csv_error"
+        def tableName14 = "test_routine_load_alter_flex_csv_error"
+        def job14 = "test_alter_flex_csv_error_job"
+
+        sql """ DROP TABLE IF EXISTS ${tableName14} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName14} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        try {
+            // create routine load with CSV format
+            sql """
+                CREATE ROUTINE LOAD ${job14} ON ${tableName14}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "csv"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic14}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            sql "PAUSE ROUTINE LOAD FOR ${job14}"
+
+            // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail 
because CSV format
+            test {
+                sql """
+                    ALTER ROUTINE LOAD FOR ${job14}
+                    PROPERTIES
+                    (
+                        "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                    );
+                """
+                exception "Flexible partial update only supports JSON format"
+            }
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job14}"
+        }
+
+        // Test 15: ALTER to flex mode fails when using fuzzy_parse
+        def kafkaJsonTopic15 = "test_routine_load_alter_flex_fuzzy_error"
+        def tableName15 = "test_routine_load_alter_flex_fuzzy_error"
+        def job15 = "test_alter_flex_fuzzy_error_job"
+
+        sql """ DROP TABLE IF EXISTS ${tableName15} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName15} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        try {
+            // create routine load with fuzzy_parse enabled
+            sql """
+                CREATE ROUTINE LOAD ${job15} ON ${tableName15}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "fuzzy_parse" = "true"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic15}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            sql "PAUSE ROUTINE LOAD FOR ${job15}"
+
+            // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail 
because fuzzy_parse
+            test {
+                sql """
+                    ALTER ROUTINE LOAD FOR ${job15}
+                    PROPERTIES
+                    (
+                        "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                    );
+                """
+                exception "Flexible partial update does not support 
fuzzy_parse"
+            }
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job15}"
+        }
+
+        // Test 16: ALTER to flex mode fails when using jsonpaths
+        def kafkaJsonTopic16 = "test_routine_load_alter_flex_jsonpaths_error"
+        def tableName16 = "test_routine_load_alter_flex_jsonpaths_error"
+        def job16 = "test_alter_flex_jsonpaths_error_job"
+
+        sql """ DROP TABLE IF EXISTS ${tableName16} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName16} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        try {
+            // create routine load with jsonpaths
+            sql """
+                CREATE ROUTINE LOAD ${job16} ON ${tableName16}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "jsonpaths" = "[\\"\\$.id\\", \\"\\$.name\\", 
\\"\\$.score\\"]"

Review Comment:
   The excessive escaping in jsonpaths string makes it hard to read. Consider 
using single quotes for the outer string or triple-quoted strings in Groovy to 
improve readability: \"jsonpaths\" = '[\"$.id\", \"$.name\", \"$.score\"]'



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java:
##########
@@ -532,11 +620,30 @@ public void checkJobProperties() throws UserException {
         }
         timezone = 
TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(TIMEZONE, 
timezone));
 
+        // check unique_key_update_mode
+        if (jobProperties.containsKey(UNIQUE_KEY_UPDATE_MODE)) {
+            String modeStr = 
jobProperties.get(UNIQUE_KEY_UPDATE_MODE).toUpperCase();
+            if (!"UPSERT".equals(modeStr) && 
!"UPDATE_FIXED_COLUMNS".equals(modeStr)
+                    && !"UPDATE_FLEXIBLE_COLUMNS".equals(modeStr)) {
+                throw new AnalysisException(UNIQUE_KEY_UPDATE_MODE
+                        + " should be one of {'UPSERT', 
'UPDATE_FIXED_COLUMNS', 'UPDATE_FLEXIBLE_COLUMNS'}, but found "
+                        + modeStr);
+            }
+            // Check for conflicting settings: partial_columns=true with 
unique_key_update_mode=UPSERT
+            if (jobProperties.containsKey(PARTIAL_COLUMNS)
+                    && 
jobProperties.get(PARTIAL_COLUMNS).equalsIgnoreCase("true")
+                    && "UPSERT".equals(modeStr)) {
+                throw new AnalysisException("Cannot set both 
'partial_columns=true' and "
+                        + "'unique_key_update_mode=UPSERT'. "
+                        + "Use unique_key_update_mode=UPDATE_FIXED_COLUMNS 
instead.");

Review Comment:
   This validation only checks for the conflicting case (partial_columns=true 
with UPSERT) but doesn't check for partial_columns=false with 
UPDATE_FIXED_COLUMNS or UPDATE_FLEXIBLE_COLUMNS, which is also conflicting. Add 
validation for these cases as well to ensure consistency.
   ```suggestion
               // Check for conflicting settings between partial_columns and 
unique_key_update_mode
               if (jobProperties.containsKey(PARTIAL_COLUMNS)) {
                   String partialColumnsVal = 
jobProperties.get(PARTIAL_COLUMNS);
                   // partial_columns=true is not allowed with UPSERT
                   if (partialColumnsVal.equalsIgnoreCase("true")
                           && "UPSERT".equals(modeStr)) {
                       throw new AnalysisException("Cannot set both 
'partial_columns=true' and "
                               + "'unique_key_update_mode=UPSERT'. "
                               + "Use 
unique_key_update_mode=UPDATE_FIXED_COLUMNS instead.");
                   }
                   // partial_columns=false is not allowed with 
UPDATE_FIXED_COLUMNS or UPDATE_FLEXIBLE_COLUMNS
                   if (partialColumnsVal.equalsIgnoreCase("false")
                           && ("UPDATE_FIXED_COLUMNS".equals(modeStr)
                               || "UPDATE_FLEXIBLE_COLUMNS".equals(modeStr))) {
                       throw new AnalysisException("Cannot set 
'partial_columns=false' when "
                               + "'unique_key_update_mode' is 
'UPDATE_FIXED_COLUMNS' or 'UPDATE_FLEXIBLE_COLUMNS'. "
                               + "Use unique_key_update_mode=UPSERT instead, or 
enable partial columns.");
                   }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java:
##########
@@ -198,8 +202,34 @@ public CreateRoutineLoadInfo(LabelNameInfo labelNameInfo, 
String tableName,
         this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory
             .createDataSource(typeName, dataSourceProperties, 
this.isMultiTable);
         this.mergeType = mergeType;
-        this.isPartialUpdate = 
this.jobProperties.getOrDefault(PARTIAL_COLUMNS, 
"false").equalsIgnoreCase("true");
-        if (this.isPartialUpdate && 
this.jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) {
+        // Parse unique_key_update_mode first (takes precedence)
+        if (this.jobProperties.containsKey(UNIQUE_KEY_UPDATE_MODE)) {
+            String modeStr = 
this.jobProperties.get(UNIQUE_KEY_UPDATE_MODE).toUpperCase();
+            switch (modeStr) {
+                case "UPSERT":
+                    this.uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT;
+                    break;
+                case "UPDATE_FIXED_COLUMNS":
+                    this.uniqueKeyUpdateMode = 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+                    this.isPartialUpdate = true;
+                    break;
+                case "UPDATE_FLEXIBLE_COLUMNS":
+                    this.uniqueKeyUpdateMode = 
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS;
+                    break;
+                default:
+                    // validation will be done in checkJobProperties()
+                    break;
+            }
+        } else {
+            // Backward compatibility: partial_columns=true maps to 
UPDATE_FIXED_COLUMNS
+            this.isPartialUpdate = 
this.jobProperties.getOrDefault(PARTIAL_COLUMNS, 
"false").equalsIgnoreCase("true");
+            if (this.isPartialUpdate) {
+                this.uniqueKeyUpdateMode = 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+            }

Review Comment:
   When both partial_columns and unique_key_update_mode are specified in 
jobProperties, the order of processing matters due to HashMap iteration. 
Consider adding validation in checkJobProperties to prevent users from 
specifying conflicting combinations like partial_columns=false with 
unique_key_update_mode=UPDATE_FIXED_COLUMNS.



##########
fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java:
##########
@@ -2010,5 +2037,104 @@ protected void modifyCommonJobProperties(Map<String, 
String> jobProperties) {
             this.maxBatchSizeBytes = Long.parseLong(
                     
jobProperties.remove(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY));
         }
+
+        if 
(jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) {
+            String modeStr = 
jobProperties.remove(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE).toUpperCase();
+            TUniqueKeyUpdateMode newMode = 
TUniqueKeyUpdateMode.valueOf(modeStr);
+            // Validate flexible partial update constraints when changing to 
UPDATE_FLEXIBLE_COLUMNS
+            if (newMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) {
+                validateFlexiblePartialUpdateForAlter();
+            }
+            this.uniqueKeyUpdateMode = newMode;
+            this.isPartialUpdate = (uniqueKeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS);
+            
this.jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, 
uniqueKeyUpdateMode.name());
+            this.jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, 
String.valueOf(isPartialUpdate));
+        }
+
+        if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
+            this.isPartialUpdate = Boolean.parseBoolean(
+                    
jobProperties.remove(CreateRoutineLoadInfo.PARTIAL_COLUMNS));
+            if (this.isPartialUpdate && uniqueKeyUpdateMode == 
TUniqueKeyUpdateMode.UPSERT) {
+                this.uniqueKeyUpdateMode = 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;

Review Comment:
   When PARTIAL_COLUMNS is set to false but uniqueKeyUpdateMode is already 
UPDATE_FIXED_COLUMNS (e.g., from a previous ALTER), this code doesn't reset 
uniqueKeyUpdateMode back to UPSERT. This could leave the job in an inconsistent 
state. Add: 'else if (!this.isPartialUpdate && uniqueKeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { this.uniqueKeyUpdateMode = 
TUniqueKeyUpdateMode.UPSERT; }'
   ```suggestion
                   this.uniqueKeyUpdateMode = 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
               } else if (!this.isPartialUpdate && uniqueKeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) {
                   this.uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to