This is an automated email from the ASF dual-hosted git repository.
morrySnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new eeebf454f70 [fix](load) quote slot labels in routine-load legacy expr
translation to avoid reserved-keyword parse failure (#63747)
eeebf454f70 is described below
commit eeebf454f70f116e7857e8bde0215e8d39fb109d
Author: York Cao <[email protected]>
AuthorDate: Wed Jun 3 14:45:50 2026 +0800
[fix](load) quote slot labels in routine-load legacy expr translation to
avoid reserved-keyword parse failure (#63747)
### What problem does this PR solve?
Problem Summary:
When a routine load job uses a column name that is a SQL reserved
keyword (e.g., `group`) in a PRECEDING FILTER clause, the
Nereids-to-legacy expression translator sets the slot label as the raw
name (e.g., `group`) without quoting. When the legacy
expression SQL is later re-parsed (e.g., during routine load reparse via
`NereidsLoadUtils.parseExpressionSeq`), the unquoted
reserved keyword causes a parse failure, pausing the routine load job.
This PR quotes the slot label using `SqlUtils.getIdentSql()` so that
reserved-keyword column names are properly backtick-quoted in
the translated legacy expression SQL, preventing the parse failure.
---
.../org/apache/doris/common/util/SqlUtils.java | 14 +++
.../mv/InitMaterializationContextHook.java | 26 ++---
.../org/apache/doris/nereids/util/PlanUtils.java | 3 +-
.../apache/doris/nereids/util/PlanUtilsTest.java | 23 ++++
regression-test/conf/regression-conf.groovy | 3 +-
.../test_routine_load_advanced_mapping.groovy | 2 +-
.../data/test_preceding_filter_keyword.csv | 3 +
.../routine_load/test_routine_load_alter.groovy | 4 +-
.../test_routine_load_condition.groovy | 2 +-
...st_routine_load_preceding_filter_keyword.groovy | 127 +++++++++++++++++++++
10 files changed, 181 insertions(+), 26 deletions(-)
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/common/util/SqlUtils.java
b/fe/fe-common/src/main/java/org/apache/doris/common/util/SqlUtils.java
index b5c6021833e..ff867e529b7 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/util/SqlUtils.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/util/SqlUtils.java
@@ -33,6 +33,20 @@ import java.util.Collections;
import java.util.List;
public class SqlUtils {
+ public static String getIdentSql(String ident) {
+ StringBuilder sb = new StringBuilder();
+ sb.append('`');
+ for (char ch : ident.toCharArray()) {
+ if (ch == '`') {
+ sb.append("``");
+ } else {
+ sb.append(ch);
+ }
+ }
+ sb.append('`');
+ return sb.toString();
+ }
+
public static String escapeQuota(String str) {
if (Strings.isNullOrEmpty(str)) {
return str;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
index bc2c705a4f7..2852fdd2a68 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
@@ -27,6 +27,7 @@ import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.SqlUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVPlanUtil;
@@ -340,7 +341,7 @@ public class InitMaterializationContextHook implements
PlannerHook {
StringBuilder createMvSqlBuilder = new StringBuilder();
createMvSqlBuilder.append(String.format("create materialized view %s
as select ", mvName));
for (Column col : columns) {
- createMvSqlBuilder.append(String.format("%s, ",
getIdentSql(col.getName())));
+ createMvSqlBuilder.append(String.format("%s, ",
SqlUtils.getIdentSql(col.getName())));
}
removeLastTwoChars(createMvSqlBuilder);
createMvSqlBuilder.append(String.format(" from %s", baseTableName));
@@ -368,14 +369,14 @@ public class InitMaterializationContextHook implements
PlannerHook {
case HLL_UNION:
case BITMAP_UNION:
case QUANTILE_UNION: {
- aggColumnsStringBuilder
- .append(String.format("%s(%s), ",
aggregateType, getIdentSql(col.getName())));
+ aggColumnsStringBuilder.append(
+ String.format("%s(%s), ", aggregateType,
SqlUtils.getIdentSql(col.getName())));
break;
}
case GENERIC: {
AggStateType aggStateType = (AggStateType)
col.getType();
aggColumnsStringBuilder.append(String.format("%s_union(%s), ",
- aggStateType.getFunctionName(),
getIdentSql(col.getName())));
+ aggStateType.getFunctionName(),
SqlUtils.getIdentSql(col.getName())));
break;
}
default: {
@@ -389,7 +390,7 @@ public class InitMaterializationContextHook implements
PlannerHook {
// use column name for key
Preconditions.checkState(col.isKey(),
String.format("%s must be key", col.getName()));
- keyColumnsStringBuilder.append(String.format("%s, ",
getIdentSql(col.getName())));
+ keyColumnsStringBuilder.append(String.format("%s, ",
SqlUtils.getIdentSql(col.getName())));
}
}
Preconditions.checkState(keyColumnsStringBuilder.length() > 0,
@@ -409,7 +410,7 @@ public class InitMaterializationContextHook implements
PlannerHook {
String.format(" from %s group by %s", baseTableName,
keyColumnsStringBuilder));
} else {
for (Column col : columns) {
- createMvSqlBuilder.append(String.format("%s, ",
getIdentSql(col.getName())));
+ createMvSqlBuilder.append(String.format("%s, ",
SqlUtils.getIdentSql(col.getName())));
}
removeLastTwoChars(createMvSqlBuilder);
createMvSqlBuilder.append(String.format(" from %s",
baseTableName));
@@ -424,17 +425,4 @@ public class InitMaterializationContextHook implements
PlannerHook {
}
}
- private static String getIdentSql(String ident) {
- StringBuilder sb = new StringBuilder();
- sb.append('`');
- for (char ch : ident.toCharArray()) {
- if (ch == '`') {
- sb.append("``");
- } else {
- sb.append(ch);
- }
- }
- sb.append('`');
- return sb.toString();
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java
index 0f8bb5e5ed1..11773fd4c03 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.util.SqlUtils;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.Scope;
@@ -482,7 +483,7 @@ public class PlanUtils {
@Override
public Expr visitSlotReference(SlotReference slotReference,
PlanTranslatorContext context) {
SlotRef slotRef = new
SlotRef(slotReference.getDataType().toCatalogDataType(),
slotReference.nullable());
- slotRef.setLabel(slotReference.getName());
+ slotRef.setLabel(SqlUtils.getIdentSql(slotReference.getName()));
slotRef.setCol(slotReference.getName());
return slotRef;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanUtilsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanUtilsTest.java
index a13825a8854..e3d744f2e40 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanUtilsTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanUtilsTest.java
@@ -17,10 +17,20 @@
package org.apache.doris.nereids.util;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.ExprToSqlVisitor;
+import org.apache.doris.analysis.ToSqlParams;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.load.NereidsLoadUtils;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.utframe.UtFrameUtils;
import com.google.common.collect.ImmutableSet;
import org.junit.jupiter.api.Assertions;
@@ -36,4 +46,17 @@ class PlanUtilsTest {
Plan filter =
PlanUtils.filterOrSelf(ImmutableSet.of(BooleanLiteral.TRUE), scan);
Assertions.assertTrue(filter instanceof LogicalFilter);
}
+
+ @Test
+ void translateToLegacyExprShouldQuoteReservedColumnForRoutineLoadReparse()
throws Exception {
+ ConnectContext ctx = UtFrameUtils.createDefaultCtx();
+ ctx.setStatementContext(new StatementContext(ctx, new
OriginStatement("", 0)));
+ Expression expr = new NereidsParser().parseExpression("`group` is not
null");
+
+ Expr legacyExpr = PlanUtils.translateToLegacyExpr(expr, null, ctx);
+ String exprSql = legacyExpr.accept(ExprToSqlVisitor.INSTANCE,
ToSqlParams.WITHOUT_TABLE);
+
+ Assertions.assertTrue(exprSql.contains("`group`"));
+ Assertions.assertDoesNotThrow(() ->
NereidsLoadUtils.parseExpressionSeq(exprSql));
+ }
}
diff --git a/regression-test/conf/regression-conf.groovy
b/regression-test/conf/regression-conf.groovy
index 824b8d553d1..ec01b743813 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -302,7 +302,7 @@ hudiEmrCatalog = ""
icebergS3TablesCatalog=""
icebergS3TablesCatalogGlueRest=""
-// The path of the cert configuration file for the testing framework
+// The path of the cert configuration file for the testing framework
// is consistent with the path of the cert file for the cluster
enableTLS=false
tlsVerifyMode="strict"
@@ -332,7 +332,6 @@ hudiHmsPort=19083
hudiMinioPort=19100
hudiMinioAccessKey="minio"
hudiMinioSecretKey="minio123"
-
icebergDlfRestCatalog="'type' = 'iceberg', 'warehouse' =
'new_dlf_iceberg_catalog', 'iceberg.catalog.type' = 'rest', 'iceberg.rest.uri'
= 'http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg',
'iceberg.rest.sigv4-enabled' = 'true', 'iceberg.rest.signing-name' = 'DlfNext',
'iceberg.rest.access-key-id' = 'ak', 'iceberg.rest.secret-access-key' = 'sk',
'iceberg.rest.signing-region' = 'cn-beijing',
'iceberg.rest.vended-credentials-enabled' = 'true', 'io-impl' =
'org.apache.iceberg.rest.DlfFileIO', [...]
// For python UDF test, set the runtime version of python, default: 3.8.10
diff --git
a/regression-test/suites/load_p0/load_ddl/test_routine_load_advanced_mapping.groovy
b/regression-test/suites/load_p0/load_ddl/test_routine_load_advanced_mapping.groovy
index 9fb5c931b17..1b70dc96c41 100644
---
a/regression-test/suites/load_p0/load_ddl/test_routine_load_advanced_mapping.groovy
+++
b/regression-test/suites/load_p0/load_ddl/test_routine_load_advanced_mapping.groovy
@@ -176,7 +176,7 @@ suite("test_routine_load_advanced_mapping","p0") {
}
log.info("reason of state changed:
${res[0][11].toString()}".toString())
def json = parseJson(res[0][11])
- assertEquals("(k00 = 8)", json.whereExpr.toString())
+ assertEquals("(`k00` = 8)", json.whereExpr.toString())
break;
}
while (true) {
diff --git
a/regression-test/suites/load_p0/routine_load/data/test_preceding_filter_keyword.csv
b/regression-test/suites/load_p0/routine_load/data/test_preceding_filter_keyword.csv
new file mode 100644
index 00000000000..8cd94a98c43
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/data/test_preceding_filter_keyword.csv
@@ -0,0 +1,3 @@
+app_a|grp_a|msg_a
+app_b|grp_b|msg_b
+app_c|grp_c|msg_c
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy
index e829f31ce9c..32571b5e29a 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy
@@ -285,8 +285,8 @@ suite("test_routine_load_alter","p0") {
assertEquals("true", json.num_as_string.toString())
assertEquals("k00,k01,k02,k03,k04,k05",
json.columnToColumnExpr.toString())
assertEquals("','", json.column_separator.toString())
- assertEquals("(CAST(k00 AS decimalv3(38,6)) = CAST(8 AS
decimalv3(38,6)))", json.precedingFilter.toString())
- assertEquals("(CAST(k00 AS decimalv3(38,6)) = CAST(8 AS
decimalv3(38,6)))", json.whereExpr.toString())
+ assertEquals("(CAST(`k00` AS decimalv3(38,6)) = CAST(8 AS
decimalv3(38,6)))", json.precedingFilter.toString())
+ assertEquals("(CAST(`k00` AS decimalv3(38,6)) = CAST(8 AS
decimalv3(38,6)))", json.whereExpr.toString())
assertEquals("p1", json.partitions.toString())
assertEquals("k00", json.sequence_col.toString())
} finally {
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy
index 765d161b9a8..c3557683e8d 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy
@@ -182,7 +182,7 @@ suite("test_routine_load_condition","p0") {
}
log.info("reason of state changed:
${res[0][11].toString()}".toString())
def json = parseJson(res[0][11])
- assertEquals("(k12 >= CAST(days_sub(current_date(), 2) AS
datetimev2(0)))", json.whereExpr.toString())
+ assertEquals("(`k12` >= CAST(days_sub(current_date(), 2) AS
datetimev2(0)))", json.whereExpr.toString())
break;
}
while (true) {
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_preceding_filter_keyword.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_preceding_filter_keyword.groovy
new file mode 100644
index 00000000000..557dd5f3d20
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_preceding_filter_keyword.groovy
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.producer.ProducerRecord
+
+suite("test_routine_load_preceding_filter_keyword", "p0") {
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafkaPort = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ def topicName = "test_preceding_filter_keyword"
+ def tableName = "test_routine_load_preceding_filter_keyword_tbl"
+ def jobName = "test_preceding_filter_keyword_job"
+
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${externalEnvIp}:${kafkaPort}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ def producer = new KafkaProducer<>(props)
+ try {
+ def txt = new
File("""${context.file.parent}/data/${topicName}.csv""").text
+ def lines = txt.readLines()
+ lines.each { line ->
+ def record = new ProducerRecord<>(topicName, null, line)
+ producer.send(record)
+ }
+ } finally {
+ producer.close()
+ }
+
+ try {
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE ${tableName} (
+ app VARCHAR(255),
+ `group` VARCHAR(255),
+ msg VARCHAR(255)
+ )
+ DUPLICATE KEY(app)
+ DISTRIBUTED BY HASH(app) BUCKETS 1
+ PROPERTIES ("replication_num" = "1");
+ """
+
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ COLUMNS(app, `group`, msg),
+ COLUMNS TERMINATED BY "|",
+ PRECEDING FILTER app IS NOT NULL AND `group` IS NOT NULL
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafkaPort}",
+ "kafka_topic" = "${topicName}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+
+ int retry = 0
+ while (true) {
+ sleep(1000)
+ def res = sql "show routine load for ${jobName}"
+ def state = res[0][8].toString()
+ def reason = res[0][17].toString()
+ if (state == "RUNNING") {
+ break
+ }
+ if (state == "PAUSED") {
+ assertTrue(false, "routine load should not be paused,
reason: ${reason}")
+ }
+ retry++
+ if (retry > 60) {
+ assertTrue(false, "routine load should become RUNNING,
current state: ${state}, reason: ${reason}")
+ }
+ }
+
+ retry = 0
+ while (true) {
+ sleep(1000)
+ def state = sql "show routine load for ${jobName}"
+ if (state[0][8].toString() == "PAUSED") {
+ assertTrue(false, "routine load should keep running,
reason: ${state[0][17].toString()}")
+ }
+ def cnt = sql "select count(*) from ${tableName}"
+ if (cnt[0][0] > 0) {
+ break
+ }
+ retry++
+ if (retry > 60) {
+ assertTrue(false, "routine load did not ingest data in
time")
+ }
+ }
+ } finally {
+ try {
+ sql "stop routine load for ${jobName}"
+ } catch (Exception e) {
+ logger.info("stop routine load failed:
${e.message}".toString())
+ }
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]