This is an automated email from the ASF dual-hosted git repository.

sollhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 85f3e1bc3ee [fix](nereids) bind file column placeholders for copy into 
select (#64395)
85f3e1bc3ee is described below

commit 85f3e1bc3eef4e4765ec5262c6fa363198bdcc81
Author: hui lai <[email protected]>
AuthorDate: Fri Jun 12 22:24:57 2026 +0800

    [fix](nereids) bind file column placeholders for copy into select (#64395)
    
    ### What problem does this PR solve?
    
    PR https://github.com/apache/doris/pull/47194 introduced the Nereids
    COPY INTO command path.
    
    In this path, CopyIntoInfo analyzes column mappings and file filter
    expressions with a scope that only contains target table columns. For
    statements like:
    ```
    COPY INTO customer FROM (
        SELECT $1, $2, ...
        FROM @stage('xxx')
    )
    ```
    
    CopyFromDesc rewrites the select list into column mappings such as
    C_CUSTKEY = $1. $1 is a valid file column placeholder, but it was
    analyzed against the target-table-only scope and failed with unknown
    column '$1'.
    
    This patch adds synthetic file column slots ($1, $2, ...) into the COPY
    INTO expression analysis scope, infers their types from target column
    mappings when possible, and translates them back to legacy SlotRef(null,
    "$N") when building DataDescription / CopyFromParam.
---
 .../trees/plans/commands/info/CopyFromDesc.java    |  33 ++++---
 .../trees/plans/commands/info/CopyIntoInfo.java    | 105 +++++++++++++++++++--
 .../suites/load_p0/copy_into/test_copy_into.groovy |  65 +++++++++++++
 3 files changed, 185 insertions(+), 18 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java
index 443f2162f85..6d885ab6ca8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java
@@ -18,7 +18,6 @@
 package org.apache.doris.nereids.trees.plans.commands.info;
 
 import org.apache.doris.analysis.CopyFromParam;
-import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StageAndPattern;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
@@ -203,23 +202,33 @@ public class CopyFromDesc {
         if (exprList == null) {
             return false;
         }
-        List<SlotRef> slotRefs = Lists.newArrayList();
-        //        Expr.collectList(exprList, SlotRef.class, slotRefs);
+        boolean hasFileColumnPlaceholder = false;
         Set<String> columnSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
-        for (SlotRef slotRef : slotRefs) {
-            String columnName = slotRef.getColumnName();
-            if (columnName.startsWith(DOLLAR)) {
-                if (fileColumns.size() > 0) {
+        List<Expression> fileColumnExpressions = exprList.stream().map(expr -> 
(Expression) expr)
+                .collect(Collectors.toList());
+        fileFilterExpr.ifPresent(fileColumnExpressions::add);
+        for (Expression expr : fileColumnExpressions) {
+            for (UnboundSlot slot : 
expr.<UnboundSlot>collectToList(UnboundSlot.class::isInstance)) {
+                String columnName = slot.getName();
+                if (columnName.startsWith(DOLLAR)) {
+                    if (!fileColumns.isEmpty()) {
+                        throw new AnalysisException("can not mix column name 
and dollar sign");
+                    }
+                    hasFileColumnPlaceholder = true;
+                    continue;
+                }
+                if (hasFileColumnPlaceholder) {
                     throw new AnalysisException("can not mix column name and 
dollar sign");
                 }
-                return false;
-            }
-            if (columnSet.add(columnName)) {
-                fileColumns.add(columnName);
+                if (columnSet.add(columnName)) {
+                    fileColumns.add(columnName);
+                }
             }
         }
+        if (hasFileColumnPlaceholder) {
+            return false;
+        }
         if (addDeleteSign) {
-            //            exprList.add(new SlotRef(null, Column.DELETE_SIGN));
             fileColumns.add(Column.DELETE_SIGN);
         }
         return true;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
index 1ae2de6eaa7..6fd99573b80 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
@@ -23,6 +23,7 @@ import org.apache.doris.analysis.CopyFromParam;
 import org.apache.doris.analysis.DataDescription;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.Separator;
+import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StageAndPattern;
 import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.analysis.TupleDescriptor;
@@ -49,6 +50,7 @@ import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.analyzer.Scope;
 import org.apache.doris.nereids.analyzer.UnboundRelation;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
 import org.apache.doris.nereids.glue.translator.ExpressionTranslator;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.nereids.jobs.executor.Analyzer;
@@ -58,6 +60,8 @@ import org.apache.doris.nereids.rules.analysis.BindRelation;
 import org.apache.doris.nereids.rules.analysis.ExpressionAnalyzer;
 import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
 import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
@@ -66,6 +70,8 @@ import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
 import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.types.StringType;
 import org.apache.doris.nereids.util.Utils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
@@ -75,6 +81,7 @@ import org.apache.doris.qe.ShowResultSetMetaData;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -234,8 +241,10 @@ public class CopyIntoInfo {
         }
         PlanTranslatorContext context = new 
PlanTranslatorContext(cascadesContext);
         List<Slot> slots = boundRelation.getOutput();
-        Scope scope = new Scope(slots);
-        ExpressionAnalyzer analyzer = new ExpressionAnalyzer(null, scope, 
cascadesContext, false, false);
+        CopyIntoFileSlots fileSlots = new CopyIntoFileSlots(slots, 
copyFromDesc.getFileColumns(),
+                copyFromDesc.getColumnMappingList());
+        ExpressionAnalyzer analyzer = new ExpressionAnalyzer(null, new 
Scope(fileSlots.getScopeSlots()),
+                cascadesContext, false, false);
 
         TupleDescriptor tupleDescriptor = context.generateTupleDesc();
         tupleDescriptor.setTable(((OlapScan) boundRelation).getTable());
@@ -248,13 +257,14 @@ public class CopyIntoInfo {
         if (copyFromDesc.getColumnMappingList() != null && 
!copyFromDesc.getColumnMappingList().isEmpty()) {
             legacyColumnMappingList = new ArrayList<>();
             for (Expression expression : copyFromDesc.getColumnMappingList()) {
-                legacyColumnMappingList.add(translateToLegacyExpr(expression, 
analyzer, context, cascadesContext));
+                legacyColumnMappingList.add(translateToLegacyExpr(expression, 
analyzer, context, cascadesContext,
+                        fileSlots));
             }
         }
         Expr legacyFileFilterExpr = null;
         if (copyFromDesc.getFileFilterExpr().isPresent()) {
             legacyFileFilterExpr = 
translateToLegacyExpr(copyFromDesc.getFileFilterExpr().get(),
-                    analyzer, context, cascadesContext);
+                    analyzer, context, cascadesContext, fileSlots);
         }
 
         String compression = copyIntoProperties.getCompression();
@@ -301,7 +311,7 @@ public class CopyIntoInfo {
     }
 
     private Expr translateToLegacyExpr(Expression expr, ExpressionAnalyzer 
analyzer, PlanTranslatorContext context,
-                                       CascadesContext cascadesContext) {
+                                       CascadesContext cascadesContext, 
CopyIntoFileSlots fileSlots) {
         Expression expression;
         try {
             expression = analyzer.analyze(expr, new 
ExpressionRewriteContext(cascadesContext));
@@ -309,11 +319,26 @@ public class CopyIntoInfo {
             throw new 
org.apache.doris.nereids.exceptions.AnalysisException("In where clause '"
                     + expr.toSql() + "', " + 
Utils.convertFirstChar(e.getMessage()));
         }
-        ExpressionToExpr translator = new ExpressionToExpr();
+        ExpressionToExpr translator = new ExpressionToExpr(fileSlots);
         return expression.accept(translator, context);
     }
 
     private static class ExpressionToExpr extends ExpressionTranslator {
+        private final CopyIntoFileSlots fileSlots;
+
+        private ExpressionToExpr(CopyIntoFileSlots fileSlots) {
+            this.fileSlots = fileSlots;
+        }
+
+        @Override
+        public Expr visitSlotReference(SlotReference slotReference, 
PlanTranslatorContext context) {
+            String fileSlotName = 
fileSlots.getFileSlotName(slotReference.getExprId());
+            if (fileSlotName != null) {
+                return new SlotRef(null, fileSlotName);
+            }
+            return super.visitSlotReference(slotReference, context);
+        }
+
         @Override
         public Expr visitCast(Cast cast, PlanTranslatorContext context) {
             // left child of cast is target type, right child of cast is 
expression
@@ -322,6 +347,74 @@ public class CopyIntoInfo {
         }
     }
 
+    private static class CopyIntoFileSlots {
+        private final List<Slot> scopeSlots;
+        private final Map<ExprId, String> fileSlotNames = Maps.newHashMap();
+
+        private CopyIntoFileSlots(List<Slot> targetSlots, List<String> 
fileColumns,
+                List<Expression> columnMappingList) {
+            scopeSlots = new ArrayList<>(targetSlots);
+            if (fileColumns == null) {
+                return;
+            }
+            Map<String, DataType> targetColumnTypes = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+            for (Slot slot : targetSlots) {
+                targetColumnTypes.put(slot.getName(), slot.getDataType());
+            }
+            Map<String, DataType> fileColumnTypes = 
inferFileColumnTypes(targetColumnTypes, columnMappingList);
+            for (String fileColumn : fileColumns) {
+                if (!isFileColumnPlaceholder(fileColumn) || 
fileSlotNames.containsValue(fileColumn)) {
+                    continue;
+                }
+                SlotReference slot = new SlotReference(fileColumn,
+                        fileColumnTypes.getOrDefault(fileColumn, 
StringType.INSTANCE), true);
+                scopeSlots.add(slot);
+                fileSlotNames.put(slot.getExprId(), fileColumn);
+            }
+        }
+
+        private List<Slot> getScopeSlots() {
+            return scopeSlots;
+        }
+
+        private String getFileSlotName(ExprId exprId) {
+            return fileSlotNames.get(exprId);
+        }
+
+        private static boolean isFileColumnPlaceholder(String columnName) {
+            return columnName != null && columnName.startsWith("$");
+        }
+
+        private static Map<String, DataType> inferFileColumnTypes(Map<String, 
DataType> targetColumnTypes,
+                List<Expression> columnMappingList) {
+            Map<String, DataType> fileColumnTypes = Maps.newHashMap();
+            if (columnMappingList == null) {
+                return fileColumnTypes;
+            }
+            for (Expression expression : columnMappingList) {
+                if (!(expression instanceof EqualTo)) {
+                    continue;
+                }
+                EqualTo columnMapping = (EqualTo) expression;
+                if (!(columnMapping.left() instanceof UnboundSlot)) {
+                    continue;
+                }
+                DataType targetType = targetColumnTypes.get(((UnboundSlot) 
columnMapping.left()).getName());
+                if (targetType == null) {
+                    continue;
+                }
+                for (UnboundSlot fileColumn : columnMapping.right()
+                        .<UnboundSlot>collect(UnboundSlot.class::isInstance)) {
+                    String fileColumnName = fileColumn.getName();
+                    if (isFileColumnPlaceholder(fileColumnName)) {
+                        fileColumnTypes.putIfAbsent(fileColumnName, 
targetType);
+                    }
+                }
+            }
+            return fileColumnTypes;
+        }
+    }
+
     // after validateStagePB, fileFormat and copyOption is not null
     private void validateStagePB(StagePB stagePB) throws AnalysisException {
         stageType = stagePB.getType();
diff --git a/regression-test/suites/load_p0/copy_into/test_copy_into.groovy 
b/regression-test/suites/load_p0/copy_into/test_copy_into.groovy
index 64e448731ca..6cfd078db7d 100644
--- a/regression-test/suites/load_p0/copy_into/test_copy_into.groovy
+++ b/regression-test/suites/load_p0/copy_into/test_copy_into.groovy
@@ -150,6 +150,71 @@ suite("test_copy_into", "p0") {
             }
             assertTrue(false, "should not come here")
         }
+
+        def csvStageName = "test_copy_into_csv"
+        try_sql """drop stage if exists ${csvStageName}"""
+        sql """
+            create stage if not exists ${csvStageName}
+            properties ('endpoint' = '${getS3Endpoint()}' ,
+            'region' = '${getS3Region()}' ,
+            'bucket' = '${getS3BucketName()}' ,
+            'prefix' = 'regression' ,
+            'ak' = '${getS3AK()}' ,
+            'sk' = '${getS3SK()}' ,
+            'provider' = '${getS3Provider()}',
+            'access_type' = 'aksk',
+            'default.file.column_separator' = "|");
+        """
+
+        sql """ DROP TABLE IF EXISTS copy_into_select_placeholder; """
+        sql """
+            CREATE TABLE copy_into_select_placeholder (
+                    p_partkey     int NOT NULL DEFAULT "1",
+                    p_name        VARCHAR(55) NOT NULL DEFAULT "2",
+                    p_mfgr        VARCHAR(25) NOT NULL DEFAULT "3"
+                    )ENGINE=OLAP
+            DUPLICATE KEY(`p_partkey`)
+            COMMENT "OLAP"
+            DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 3;
+        """
+
+        result = sql """
+            copy into copy_into_select_placeholder
+            from (select \$1, \$2, \$3 from 
@${csvStageName}('tpch/sf1/part.csv.split00.gz'))
+            properties ('file.type' = 'csv', 'file.column_separator' = '|',
+                    'file.compression' = 'gz', 'copy.async' = 'false');
+            """
+        logger.info("copy select placeholder result: " + result)
+        assertTrue(result.size() == 1)
+        assertTrue(result[0][1].equals("FINISHED"),
+                "Finish copy into, state=" + result[0][1] + ", expected 
state=FINISHED")
+        def selectPlaceholderCount = sql """ SELECT COUNT(*) FROM 
copy_into_select_placeholder; """
+        assertTrue((selectPlaceholderCount[0][0] as long) > 0)
+
+        sql """ DROP TABLE IF EXISTS copy_into_filter_placeholder; """
+        sql """
+            CREATE TABLE copy_into_filter_placeholder (
+                    p_partkey     int NOT NULL DEFAULT "1"
+                    )ENGINE=OLAP
+            DUPLICATE KEY(`p_partkey`)
+            COMMENT "OLAP"
+            DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 3;
+        """
+
+        result = sql """
+            copy into copy_into_filter_placeholder (p_partkey)
+            from (select 1 from 
@${csvStageName}('tpch/sf1/part.csv.split00.gz') where \$1 is not null)
+            properties ('file.type' = 'csv', 'file.column_separator' = '|',
+                    'file.compression' = 'gz', 'copy.async' = 'false');
+            """
+        logger.info("copy filter placeholder result: " + result)
+        assertTrue(result.size() == 1)
+        assertTrue(result[0][1].equals("FINISHED"),
+                "Finish copy into, state=" + result[0][1] + ", expected 
state=FINISHED")
+        def filterPlaceholderCount = sql """ SELECT COUNT(*) FROM 
copy_into_filter_placeholder; """
+        assertTrue((filterPlaceholderCount[0][0] as long) > 0)
+
+        try_sql """drop stage if exists ${csvStageName}"""
         try_sql """drop stage if exists ${externalStageName}"""
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to