This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ce71c3497f4 [fix](load) Fix src slot mapping issue when file columns 
are not explicitly specified (#56041)
ce71c3497f4 is described below

commit ce71c3497f49ca074ffbaf4988835c9215ee6b35
Author: Xin Liao <[email protected]>
AuthorDate: Tue Sep 16 12:38:43 2025 +0800

    [fix](load) Fix src slot mapping issue when file columns are not explicitly 
specified (#56041)
    
    When users specify only SET expressions (e.g., 'kd01=20230102') without
    explicit file column mapping, the system incorrectly auto-generates file
    columns that include columns meant for SET expressions only.
    
    Root cause:
    - when specifyFileFieldNames=false, the system auto-generates file
    columns for all table columns
    - However, columns with SET expressions (stored in userMappingColumns)
    should be skipped during auto-generation since they don't exist in the
    source file
    - The missing skip logic caused these SET-only columns to be treated as
    file columns
    
    Issues caused:
    1. BE reports 'failed to find default value expr for slot' when SET
    columns are
       missing from source file
    2. Incorrect slot mapping leading to data quality errors
---
 .../nereids/load/NereidsLoadScanProvider.java      |  17 +-
 .../load_p0/broker_load/test_s3_load_with_set.out  | Bin 0 -> 224 bytes
 .../data/load_p0/stream_load/test_bitmap.csv       |   1 +
 .../stream_load/test_stream_load_bitmap.out        | Bin 0 -> 115 bytes
 .../stream_load/test_stream_load_with_set.out      | Bin 0 -> 113 bytes
 .../broker_load/test_s3_load_with_set.groovy       | 218 +++++++++++++++++++++
 .../stream_load/test_stream_load_bitmap.groovy     |  54 +++++
 .../stream_load/test_stream_load_with_set.groovy   |  96 +++++++++
 8 files changed, 385 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java
index 70f44da7d83..32d8262a0f3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java
@@ -62,6 +62,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -164,11 +165,20 @@ public class NereidsLoadScanProvider {
         //          (k1, k2, tmpk3 = k1 + k2, k3 = k1 + k2)
         //     so "tmpk3 = k1 + k2" is not needed anymore, we can skip it.
         List<NereidsImportColumnDesc> copiedColumnExprs = new 
ArrayList<>(columnDescs.size());
+        Set<String> constantMappingColumns = new HashSet<>();
         for (NereidsImportColumnDesc importColumnDesc : columnDescs) {
             String mappingColumnName = importColumnDesc.getColumnName();
-            if (importColumnDesc.isColumn() || 
tbl.getColumn(mappingColumnName) != null) {
+            if (importColumnDesc.isColumn()) {
                 copiedColumnExprs.add(importColumnDesc);
+            } else if (tbl.getColumn(mappingColumnName) != null) {
+                copiedColumnExprs.add(importColumnDesc);
+                // Only track columns with constant expressions (e.g., "k1 = 
'constant'")
+                // Non-constant expressions (e.g., "k1 = k1 + 1") still need 
to read from file
+                if (importColumnDesc.getExpr().isConstant()) {
+                    constantMappingColumns.add(mappingColumnName);
+                }
             }
+            // Skip mapping columns that don't exist in table schema
         }
 
         // check whether the OlapTable has sequenceCol and skipBitmapCol
@@ -188,6 +198,11 @@ public class NereidsLoadScanProvider {
         if (!specifyFileFieldNames) {
             List<Column> columns = tbl.getBaseSchema(false);
             for (Column column : columns) {
+                if (constantMappingColumns.contains(column.getName())) {
+                    // Skip this column because user has already specified a 
constant mapping expression for it
+                    // in the COLUMNS parameter (e.g., "column_name = 
'constant_value'")
+                    continue;
+                }
                 NereidsImportColumnDesc columnDesc;
                 if (fileGroup.getFileFormatProperties().getFileFormatType() == 
TFileFormatType.FORMAT_JSON) {
                     columnDesc = new NereidsImportColumnDesc(column.getName());
diff --git a/regression-test/data/load_p0/broker_load/test_s3_load_with_set.out 
b/regression-test/data/load_p0/broker_load/test_s3_load_with_set.out
new file mode 100644
index 00000000000..61ae94b779e
Binary files /dev/null and 
b/regression-test/data/load_p0/broker_load/test_s3_load_with_set.out differ
diff --git a/regression-test/data/load_p0/stream_load/test_bitmap.csv 
b/regression-test/data/load_p0/stream_load/test_bitmap.csv
new file mode 100644
index 00000000000..671731ed874
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_bitmap.csv
@@ -0,0 +1 @@
+b,2,AA==
diff --git 
a/regression-test/data/load_p0/stream_load/test_stream_load_bitmap.out 
b/regression-test/data/load_p0/stream_load/test_stream_load_bitmap.out
new file mode 100644
index 00000000000..5e7cb4ad9f6
Binary files /dev/null and 
b/regression-test/data/load_p0/stream_load/test_stream_load_bitmap.out differ
diff --git 
a/regression-test/data/load_p0/stream_load/test_stream_load_with_set.out 
b/regression-test/data/load_p0/stream_load/test_stream_load_with_set.out
new file mode 100644
index 00000000000..70ffe11e1ed
Binary files /dev/null and 
b/regression-test/data/load_p0/stream_load/test_stream_load_with_set.out differ
diff --git 
a/regression-test/suites/load_p0/broker_load/test_s3_load_with_set.groovy 
b/regression-test/suites/load_p0/broker_load/test_s3_load_with_set.groovy
new file mode 100644
index 00000000000..1987f548bb3
--- /dev/null
+++ b/regression-test/suites/load_p0/broker_load/test_s3_load_with_set.groovy
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_s3_load_with_set", "load_p0") {
+    def s3BucketName = getS3BucketName()
+    def s3Endpoint = getS3Endpoint()
+    def s3Region = getS3Region()
+
+    def table = "s3_load_with_set"
+
+    sql """ DROP TABLE IF EXISTS ${table} """
+
+    sql """
+        CREATE TABLE ${table}
+        (
+            k00 INT             NOT NULL,
+            k01 DATE            NOT NULL,
+            k02 BOOLEAN         NULL,
+            k03 TINYINT         NULL,
+            k04 SMALLINT        NULL,
+            k05 INT             NULL,
+            k06 BIGINT          NULL,
+            k07 LARGEINT        NULL,
+            k08 FLOAT           NULL,
+            k09 DOUBLE          NULL,
+            k10 DECIMAL(9,1)           NULL,
+            k11 DECIMALV3(9,1)         NULL,
+            k12 DATETIME        NULL,
+            k13 DATEV2          NULL,
+            k14 DATETIMEV2      NULL,
+            k15 CHAR            NULL,
+            k16 VARCHAR         NULL,
+            k17 STRING          NULL,
+            k18 JSON            NULL,
+            kd01 DATE            NOT NULL 
+        )
+        DUPLICATE KEY(k00)
+        DISTRIBUTED BY HASH(k00) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        );
+    """
+
+    def attributesList = [
+
+    ]
+
+    attributesList.add(new 
LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.csv",
+            "${table}", "LINES TERMINATED BY \"\n\"", "COLUMNS TERMINATED BY 
\"|\"", "FORMAT AS \"CSV\"", 
"(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18)",
+            "set(kd01=20240123)", "", "", "", ""))
+
+    attributesList.add(new 
LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.csv",
+            "${table}", "LINES TERMINATED BY \"\n\"", "COLUMNS TERMINATED BY 
\"|\"", "FORMAT AS \"CSV\"", "",
+            "set(kd01=20240123)", "", "", "", ""))
+
+    attributesList.add(new 
LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.parq",
+            "${table}", "", "", "FORMAT AS \"parquet\"", 
"(K00,K01,K02,K03,K04,K05,K06,K07,K08,K09,K10,K11,K12,K13,K14,K15,K16,K17,K18)",
+            "set(kd01=20240123)", "", "", "", ""))
+
+    attributesList.add(new 
LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.parq",
+            "${table}", "", "", "FORMAT AS \"parquet\"", "",
+            "set(kd01=20240123)", "", "", "", ""))
+
+    attributesList.add(new 
LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.orc",
+            "${table}", "", "", "FORMAT AS \"orc\"", 
"(K00,K01,K02,K03,K04,K05,K06,K07,K08,K09,K10,K11,K12,K13,K14,K15,K16,K17,K18)",
+            "set(kd01=20240123)", "", "", "", ""))
+
+    attributesList.add(new 
LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.orc",
+            "${table}", "", "", "FORMAT AS \"orc\"", "",
+            "set(kd01=20240123)", "", "", "", ""))
+
+    def ak = getS3AK()
+    def sk = getS3SK()
+
+    def i = 0
+    for (LoadAttributes attributes : attributesList) {
+        def label = "test_s3_load_" + 
UUID.randomUUID().toString().replace("-", "_") + "_" + i
+        attributes.label = label
+        def prop = attributes.getPropertiesStr()
+
+        def sql_str = """
+            LOAD LABEL $label (
+                $attributes.dataDesc.mergeType
+                DATA INFILE("$attributes.dataDesc.path")
+                INTO TABLE $attributes.dataDesc.tableName
+                $attributes.dataDesc.columnTermClause
+                $attributes.dataDesc.lineTermClause
+                $attributes.dataDesc.formatClause
+                $attributes.dataDesc.columns
+                $attributes.dataDesc.columnsFromPathClause
+                $attributes.dataDesc.columnMappingClause
+                $attributes.dataDesc.precedingFilterClause
+                $attributes.dataDesc.orderByClause
+                $attributes.dataDesc.whereExpr
+            )
+            WITH S3 (
+                "AWS_ACCESS_KEY" = "$ak",
+                "AWS_SECRET_KEY" = "$sk",
+                "AWS_ENDPOINT" = "${s3Endpoint}",
+                "AWS_REGION" = "${s3Region}",
+                "use_path_style" = "$attributes.usePathStyle"
+            )
+            ${prop}
+            """
+        logger.info("submit sql: ${sql_str}");
+        sql """${sql_str}"""
+        logger.info("Submit load with lable: $label, table: 
$attributes.dataDesc.tableName, path: $attributes.dataDesc.path")
+
+        def max_try_milli_secs = 600000
+        while (max_try_milli_secs > 0) {
+            String[][] result = sql """ show load where 
label="$attributes.label" order by createtime desc limit 1; """
+            if (result[0][2].equals("FINISHED")) {
+                if (attributes.isExceptFailed) {
+                    assertTrue(false, "load should be failed but was success: 
$result")
+                }
+                logger.info("Load FINISHED " + attributes.label + ": $result")
+                break
+            }
+            if (result[0][2].equals("CANCELLED")) {
+                if (attributes.isExceptFailed) {
+                    logger.info("Load FINISHED " + attributes.label)
+                    break
+                }
+                assertTrue(false, "load failed: $result")
+                break
+            }
+            Thread.sleep(1000)
+            max_try_milli_secs -= 1000
+            if (max_try_milli_secs <= 0) {
+                assertTrue(false, "load Timeout: $attributes.label")
+            }
+        }
+        qt_select """ select count(*) from $attributes.dataDesc.tableName """
+        ++i
+    }
+
+    qt_select """ select count(*) from ${table} """
+}
+
+class DataDesc {
+    public String mergeType = ""
+    public String path
+    public String tableName
+    public String lineTermClause
+    public String columnTermClause
+    public String formatClause
+    public String columns
+    public String columnsFromPathClause
+    public String precedingFilterClause
+    public String columnMappingClause
+    public String whereExpr
+    public String orderByClause
+}
+
+class LoadAttributes {
+    LoadAttributes(String path, String tableName, String lineTermClause, 
String columnTermClause, String formatClause,
+                   String columns, String columnsFromPathClause, String 
precedingFilterClause, String columnMappingClause, String whereExpr, String 
orderByClause, boolean isExceptFailed = false) {
+        this.dataDesc = new DataDesc()
+        this.dataDesc.path = path
+        this.dataDesc.tableName = tableName
+        this.dataDesc.lineTermClause = lineTermClause
+        this.dataDesc.columnTermClause = columnTermClause
+        this.dataDesc.formatClause = formatClause
+        this.dataDesc.columns = columns
+        this.dataDesc.columnsFromPathClause = columnsFromPathClause
+        this.dataDesc.precedingFilterClause = precedingFilterClause
+        this.dataDesc.columnMappingClause = columnMappingClause
+        this.dataDesc.whereExpr = whereExpr
+        this.dataDesc.orderByClause = orderByClause
+
+        this.isExceptFailed = isExceptFailed
+
+        properties = new HashMap<>()
+    }
+
+    LoadAttributes addProperties(String k, String v) {
+        properties.put(k, v)
+        return this
+    }
+
+    String getPropertiesStr() {
+        if (properties.isEmpty()) {
+            return ""
+        }
+        String prop = "PROPERTIES ("
+        properties.forEach (k, v) -> {
+            prop += "\"${k}\" = \"${v}\","
+        }
+        prop = prop.substring(0, prop.size() - 1)
+        prop += ")"
+        return prop
+    }
+
+    LoadAttributes withPathStyle() {
+        usePathStyle = "true"
+        return this
+    }
+
+    public DataDesc dataDesc
+    public Map<String, String> properties
+    public String label
+    public String usePathStyle = "false"
+    public boolean isExceptFailed
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_bitmap.groovy 
b/regression-test/suites/load_p0/stream_load/test_stream_load_bitmap.groovy
new file mode 100644
index 00000000000..0c22acc14c2
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load_bitmap.groovy
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_bitmap", "p0") {
+    def tableName = "test_stream_load_bitmap"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE ${tableName} (
+          `cache_key` varchar(20) NOT NULL,
+          `result_cnt` int NULL,
+          `result` bitmap NOT NULL 
+        ) ENGINE=OLAP
+        UNIQUE KEY(`cache_key`)
+        DISTRIBUTED BY HASH(`cache_key`) BUCKETS 1
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    // test strict_mode success
+    streamLoad {
+        table "${tableName}"
+
+        file 'test_bitmap.csv'
+        set "column_separator", ","
+        set "columns", 
"cache_key,result_cnt,result,result=bitmap_from_base64(result)"
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(1, json.NumberTotalRows)
+        }
+        time 10000 // limit inflight 10s
+    }
+
+    sql "sync"
+    qt_sql2 "select * from ${tableName}"
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_with_set.groovy 
b/regression-test/suites/load_p0/stream_load/test_stream_load_with_set.groovy
new file mode 100644
index 00000000000..0783d65c27d
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_stream_load_with_set.groovy
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_with_set", "load_p0") {
+    def tableName = "stream_load_with_set"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+
+    sql """
+        CREATE TABLE ${tableName}
+        (
+            k00 INT             NOT NULL,
+            k01 DATE            NOT NULL,
+            k02 BOOLEAN         NULL,
+            k03 TINYINT         NULL,
+            k04 SMALLINT        NULL,
+            k05 INT             NULL,
+            k06 BIGINT          NULL,
+            k07 LARGEINT        NULL,
+            k08 FLOAT           NULL,
+            k09 DOUBLE          NULL,
+            k10 DECIMAL(9,1)           NULL,
+            k11 DECIMALV3(9,1)         NULL,
+            k12 DATETIME        NULL,
+            k13 DATEV2          NULL,
+            k14 DATETIMEV2      NULL,
+            k15 CHAR            NULL,
+            k16 VARCHAR         NULL,
+            kd01 DATE            NOT NULL,
+            k17 STRING          NULL,
+            k18 JSON            NULL
+        )
+        DUPLICATE KEY(k00)
+        DISTRIBUTED BY HASH(k00) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        );
+    """
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'columns', 
"k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18,kd01=20240123"
+        file "basic_data.csv"
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(20, json.NumberTotalRows)
+            assertEquals(20, json.NumberLoadedRows)
+            assertEquals(0, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'columns', "kd01=20240123"
+        file "basic_data.csv"
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(20, json.NumberTotalRows)
+            assertEquals(20, json.NumberLoadedRows)
+            assertEquals(0, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+    qt_select """ select count(*) from ${tableName} """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to