This is an automated email from the ASF dual-hosted git repository.

morrySnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new eeebf454f70 [fix](load) quote slot labels in routine-load legacy expr 
translation to avoid reserved-keyword parse failure (#63747)
eeebf454f70 is described below

commit eeebf454f70f116e7857e8bde0215e8d39fb109d
Author: York Cao <[email protected]>
AuthorDate: Wed Jun 3 14:45:50 2026 +0800

    [fix](load) quote slot labels in routine-load legacy expr translation to 
avoid reserved-keyword parse failure (#63747)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    When a routine load job uses a column name that is a SQL reserved
    keyword (e.g., `group`) in a PRECEDING FILTER clause, the
    Nereids-to-legacy expression translator sets the slot label as the raw
    name (e.g., `group`) without quoting. When the legacy
    expression SQL is later re-parsed (e.g., during routine load reparse via
    `NereidsLoadUtils.parseExpressionSeq`), the unquoted
    reserved keyword causes a parse failure, pausing the routine load job.
    
    This PR quotes the slot label using `SqlUtils.getIdentSql()` so that
    reserved-keyword column names are properly backtick-quoted in
    the translated legacy expression SQL, preventing the parse failure.
---
 .../org/apache/doris/common/util/SqlUtils.java     |  14 +++
 .../mv/InitMaterializationContextHook.java         |  26 ++---
 .../org/apache/doris/nereids/util/PlanUtils.java   |   3 +-
 .../apache/doris/nereids/util/PlanUtilsTest.java   |  23 ++++
 regression-test/conf/regression-conf.groovy        |   3 +-
 .../test_routine_load_advanced_mapping.groovy      |   2 +-
 .../data/test_preceding_filter_keyword.csv         |   3 +
 .../routine_load/test_routine_load_alter.groovy    |   4 +-
 .../test_routine_load_condition.groovy             |   2 +-
 ...st_routine_load_preceding_filter_keyword.groovy | 127 +++++++++++++++++++++
 10 files changed, 181 insertions(+), 26 deletions(-)

diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/util/SqlUtils.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/util/SqlUtils.java
index b5c6021833e..ff867e529b7 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/util/SqlUtils.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/util/SqlUtils.java
@@ -33,6 +33,20 @@ import java.util.Collections;
 import java.util.List;
 
 public class SqlUtils {
+    public static String getIdentSql(String ident) {
+        StringBuilder sb = new StringBuilder();
+        sb.append('`');
+        for (char ch : ident.toCharArray()) {
+            if (ch == '`') {
+                sb.append("``");
+            } else {
+                sb.append(ch);
+            }
+        }
+        sb.append('`');
+        return sb.toString();
+    }
+
     public static String escapeQuota(String str) {
         if (Strings.isNullOrEmpty(str)) {
             return str;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
index bc2c705a4f7..2852fdd2a68 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
@@ -27,6 +27,7 @@ import org.apache.doris.catalog.MaterializedIndexMeta;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.SqlUtils;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.mtmv.MTMVCache;
 import org.apache.doris.mtmv.MTMVPlanUtil;
@@ -340,7 +341,7 @@ public class InitMaterializationContextHook implements 
PlannerHook {
         StringBuilder createMvSqlBuilder = new StringBuilder();
         createMvSqlBuilder.append(String.format("create materialized view %s 
as select ", mvName));
         for (Column col : columns) {
-            createMvSqlBuilder.append(String.format("%s, ", 
getIdentSql(col.getName())));
+            createMvSqlBuilder.append(String.format("%s, ", 
SqlUtils.getIdentSql(col.getName())));
         }
         removeLastTwoChars(createMvSqlBuilder);
         createMvSqlBuilder.append(String.format(" from %s", baseTableName));
@@ -368,14 +369,14 @@ public class InitMaterializationContextHook implements 
PlannerHook {
                         case HLL_UNION:
                         case BITMAP_UNION:
                         case QUANTILE_UNION: {
-                            aggColumnsStringBuilder
-                                    .append(String.format("%s(%s), ", 
aggregateType, getIdentSql(col.getName())));
+                            aggColumnsStringBuilder.append(
+                                    String.format("%s(%s), ", aggregateType, 
SqlUtils.getIdentSql(col.getName())));
                             break;
                         }
                         case GENERIC: {
                             AggStateType aggStateType = (AggStateType) 
col.getType();
                             
aggColumnsStringBuilder.append(String.format("%s_union(%s), ",
-                                    aggStateType.getFunctionName(), 
getIdentSql(col.getName())));
+                                    aggStateType.getFunctionName(), 
SqlUtils.getIdentSql(col.getName())));
                             break;
                         }
                         default: {
@@ -389,7 +390,7 @@ public class InitMaterializationContextHook implements 
PlannerHook {
                     // use column name for key
                     Preconditions.checkState(col.isKey(),
                             String.format("%s must be key", col.getName()));
-                    keyColumnsStringBuilder.append(String.format("%s, ", 
getIdentSql(col.getName())));
+                    keyColumnsStringBuilder.append(String.format("%s, ", 
SqlUtils.getIdentSql(col.getName())));
                 }
             }
             Preconditions.checkState(keyColumnsStringBuilder.length() > 0,
@@ -409,7 +410,7 @@ public class InitMaterializationContextHook implements 
PlannerHook {
                     String.format(" from %s group by %s", baseTableName, 
keyColumnsStringBuilder));
         } else {
             for (Column col : columns) {
-                createMvSqlBuilder.append(String.format("%s, ", 
getIdentSql(col.getName())));
+                createMvSqlBuilder.append(String.format("%s, ", 
SqlUtils.getIdentSql(col.getName())));
             }
             removeLastTwoChars(createMvSqlBuilder);
             createMvSqlBuilder.append(String.format(" from %s", 
baseTableName));
@@ -424,17 +425,4 @@ public class InitMaterializationContextHook implements 
PlannerHook {
         }
     }
 
-    private static String getIdentSql(String ident) {
-        StringBuilder sb = new StringBuilder();
-        sb.append('`');
-        for (char ch : ident.toCharArray()) {
-            if (ch == '`') {
-                sb.append("``");
-            } else {
-                sb.append(ch);
-            }
-        }
-        sb.append('`');
-        return sb.toString();
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java
index 0f8bb5e5ed1..11773fd4c03 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.util.SqlUtils;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.analyzer.Scope;
@@ -482,7 +483,7 @@ public class PlanUtils {
         @Override
         public Expr visitSlotReference(SlotReference slotReference, 
PlanTranslatorContext context) {
             SlotRef slotRef = new 
SlotRef(slotReference.getDataType().toCatalogDataType(), 
slotReference.nullable());
-            slotRef.setLabel(slotReference.getName());
+            slotRef.setLabel(SqlUtils.getIdentSql(slotReference.getName()));
             slotRef.setCol(slotReference.getName());
             return slotRef;
         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanUtilsTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanUtilsTest.java
index a13825a8854..e3d744f2e40 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanUtilsTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanUtilsTest.java
@@ -17,10 +17,20 @@
 
 package org.apache.doris.nereids.util;
 
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.ExprToSqlVisitor;
+import org.apache.doris.analysis.ToSqlParams;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.load.NereidsLoadUtils;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.utframe.UtFrameUtils;
 
 import com.google.common.collect.ImmutableSet;
 import org.junit.jupiter.api.Assertions;
@@ -36,4 +46,17 @@ class PlanUtilsTest {
         Plan filter = 
PlanUtils.filterOrSelf(ImmutableSet.of(BooleanLiteral.TRUE), scan);
         Assertions.assertTrue(filter instanceof LogicalFilter);
     }
+
+    @Test
+    void translateToLegacyExprShouldQuoteReservedColumnForRoutineLoadReparse() 
throws Exception {
+        ConnectContext ctx = UtFrameUtils.createDefaultCtx();
+        ctx.setStatementContext(new StatementContext(ctx, new 
OriginStatement("", 0)));
+        Expression expr = new NereidsParser().parseExpression("`group` is not 
null");
+
+        Expr legacyExpr = PlanUtils.translateToLegacyExpr(expr, null, ctx);
+        String exprSql = legacyExpr.accept(ExprToSqlVisitor.INSTANCE, 
ToSqlParams.WITHOUT_TABLE);
+
+        Assertions.assertTrue(exprSql.contains("`group`"));
+        Assertions.assertDoesNotThrow(() -> 
NereidsLoadUtils.parseExpressionSeq(exprSql));
+    }
 }
diff --git a/regression-test/conf/regression-conf.groovy 
b/regression-test/conf/regression-conf.groovy
index 824b8d553d1..ec01b743813 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -302,7 +302,7 @@ hudiEmrCatalog = ""
 icebergS3TablesCatalog=""
 icebergS3TablesCatalogGlueRest=""
 
-// The path of the cert configuration file for the testing framework 
+// The path of the cert configuration file for the testing framework
 // is consistent with the path of the cert file for the cluster
 enableTLS=false
 tlsVerifyMode="strict"
@@ -332,7 +332,6 @@ hudiHmsPort=19083
 hudiMinioPort=19100
 hudiMinioAccessKey="minio"
 hudiMinioSecretKey="minio123"
-
 icebergDlfRestCatalog="'type' = 'iceberg', 'warehouse' = 
'new_dlf_iceberg_catalog', 'iceberg.catalog.type' = 'rest', 'iceberg.rest.uri' 
= 'http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg', 
'iceberg.rest.sigv4-enabled' = 'true', 'iceberg.rest.signing-name' = 'DlfNext', 
'iceberg.rest.access-key-id' = 'ak', 'iceberg.rest.secret-access-key' = 'sk', 
'iceberg.rest.signing-region' = 'cn-beijing', 
'iceberg.rest.vended-credentials-enabled' = 'true', 'io-impl' = 
'org.apache.iceberg.rest.DlfFileIO', [...]
 
 // For python UDF test, set the runtime version of python, default: 3.8.10
diff --git 
a/regression-test/suites/load_p0/load_ddl/test_routine_load_advanced_mapping.groovy
 
b/regression-test/suites/load_p0/load_ddl/test_routine_load_advanced_mapping.groovy
index 9fb5c931b17..1b70dc96c41 100644
--- 
a/regression-test/suites/load_p0/load_ddl/test_routine_load_advanced_mapping.groovy
+++ 
b/regression-test/suites/load_p0/load_ddl/test_routine_load_advanced_mapping.groovy
@@ -176,7 +176,7 @@ suite("test_routine_load_advanced_mapping","p0") {
                 }
                 log.info("reason of state changed: 
${res[0][11].toString()}".toString())
                 def json = parseJson(res[0][11])
-                assertEquals("(k00 = 8)", json.whereExpr.toString())
+                assertEquals("(`k00` = 8)", json.whereExpr.toString())
                 break;
             }
             while (true) {
diff --git 
a/regression-test/suites/load_p0/routine_load/data/test_preceding_filter_keyword.csv
 
b/regression-test/suites/load_p0/routine_load/data/test_preceding_filter_keyword.csv
new file mode 100644
index 00000000000..8cd94a98c43
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/data/test_preceding_filter_keyword.csv
@@ -0,0 +1,3 @@
+app_a|grp_a|msg_a
+app_b|grp_b|msg_b
+app_c|grp_c|msg_c
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy
index e829f31ce9c..32571b5e29a 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy
@@ -285,8 +285,8 @@ suite("test_routine_load_alter","p0") {
             assertEquals("true", json.num_as_string.toString())
             assertEquals("k00,k01,k02,k03,k04,k05", 
json.columnToColumnExpr.toString())
             assertEquals("','", json.column_separator.toString())
-            assertEquals("(CAST(k00 AS decimalv3(38,6)) = CAST(8 AS 
decimalv3(38,6)))", json.precedingFilter.toString())
-            assertEquals("(CAST(k00 AS decimalv3(38,6)) = CAST(8 AS 
decimalv3(38,6)))", json.whereExpr.toString())
+            assertEquals("(CAST(`k00` AS decimalv3(38,6)) = CAST(8 AS 
decimalv3(38,6)))", json.precedingFilter.toString())
+            assertEquals("(CAST(`k00` AS decimalv3(38,6)) = CAST(8 AS 
decimalv3(38,6)))", json.whereExpr.toString())
             assertEquals("p1", json.partitions.toString())
             assertEquals("k00", json.sequence_col.toString())
         } finally {
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy
index 765d161b9a8..c3557683e8d 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy
@@ -182,7 +182,7 @@ suite("test_routine_load_condition","p0") {
                 }
                 log.info("reason of state changed: 
${res[0][11].toString()}".toString())
                 def json = parseJson(res[0][11])
-                assertEquals("(k12 >= CAST(days_sub(current_date(), 2) AS 
datetimev2(0)))", json.whereExpr.toString())
+                assertEquals("(`k12` >= CAST(days_sub(current_date(), 2) AS 
datetimev2(0)))", json.whereExpr.toString())
                 break;
             }
             while (true) {
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_preceding_filter_keyword.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_preceding_filter_keyword.groovy
new file mode 100644
index 00000000000..557dd5f3d20
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_preceding_filter_keyword.groovy
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.producer.ProducerRecord
+
+suite("test_routine_load_preceding_filter_keyword", "p0") {
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafkaPort = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def topicName = "test_preceding_filter_keyword"
+        def tableName = "test_routine_load_preceding_filter_keyword_tbl"
+        def jobName = "test_preceding_filter_keyword_job"
+
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${externalEnvIp}:${kafkaPort}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        def producer = new KafkaProducer<>(props)
+        try {
+            def txt = new 
File("""${context.file.parent}/data/${topicName}.csv""").text
+            def lines = txt.readLines()
+            lines.each { line ->
+                def record = new ProducerRecord<>(topicName, null, line)
+                producer.send(record)
+            }
+        } finally {
+            producer.close()
+        }
+
+        try {
+            sql """ DROP TABLE IF EXISTS ${tableName} """
+            sql """
+                CREATE TABLE ${tableName} (
+                    app VARCHAR(255),
+                    `group` VARCHAR(255),
+                    msg VARCHAR(255)
+                )
+                DUPLICATE KEY(app)
+                DISTRIBUTED BY HASH(app) BUCKETS 1
+                PROPERTIES ("replication_num" = "1");
+            """
+
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                COLUMNS(app, `group`, msg),
+                COLUMNS TERMINATED BY "|",
+                PRECEDING FILTER app IS NOT NULL AND `group` IS NOT NULL
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafkaPort}",
+                    "kafka_topic" = "${topicName}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+
+            int retry = 0
+            while (true) {
+                sleep(1000)
+                def res = sql "show routine load for ${jobName}"
+                def state = res[0][8].toString()
+                def reason = res[0][17].toString()
+                if (state == "RUNNING") {
+                    break
+                }
+                if (state == "PAUSED") {
+                    assertTrue(false, "routine load should not be paused, 
reason: ${reason}")
+                }
+                retry++
+                if (retry > 60) {
+                    assertTrue(false, "routine load should become RUNNING, 
current state: ${state}, reason: ${reason}")
+                }
+            }
+
+            retry = 0
+            while (true) {
+                sleep(1000)
+                def state = sql "show routine load for ${jobName}"
+                if (state[0][8].toString() == "PAUSED") {
+                    assertTrue(false, "routine load should keep running, 
reason: ${state[0][17].toString()}")
+                }
+                def cnt = sql "select count(*) from ${tableName}"
+                if (cnt[0][0] > 0) {
+                    break
+                }
+                retry++
+                if (retry > 60) {
+                    assertTrue(false, "routine load did not ingest data in 
time")
+                }
+            }
+        } finally {
+            try {
+                sql "stop routine load for ${jobName}"
+            } catch (Exception e) {
+                logger.info("stop routine load failed: 
${e.message}".toString())
+            }
+            sql """ DROP TABLE IF EXISTS ${tableName} """
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to