This is an automated email from the ASF dual-hosted git repository.
sollhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 85f3e1bc3ee [fix](nereids) bind file column placeholders for copy into
select (#64395)
85f3e1bc3ee is described below
commit 85f3e1bc3eef4e4765ec5262c6fa363198bdcc81
Author: hui lai <[email protected]>
AuthorDate: Fri Jun 12 22:24:57 2026 +0800
[fix](nereids) bind file column placeholders for copy into select (#64395)
### What problem does this PR solve?
PR https://github.com/apache/doris/pull/47194 introduced the Nereids
COPY INTO command path.
In this path, CopyIntoInfo analyzes column mappings and file filter
expressions with a scope that only contains target table columns. For
statements like:
```
COPY INTO customer FROM (
SELECT $1, $2, ...
FROM @stage('xxx')
)
```
CopyFromDesc rewrites the select list into column mappings such as
C_CUSTKEY = $1. $1 is a valid file column placeholder, but it was
analyzed against the target-table-only scope and failed with unknown
column '$1'.
This patch adds synthetic file column slots ($1, $2, ...) into the COPY
INTO expression analysis scope, infers their types from target column
mappings when possible, and translates them back to legacy SlotRef(null,
"$N") when building DataDescription / CopyFromParam.
---
.../trees/plans/commands/info/CopyFromDesc.java | 33 ++++---
.../trees/plans/commands/info/CopyIntoInfo.java | 105 +++++++++++++++++++--
.../suites/load_p0/copy_into/test_copy_into.groovy | 65 +++++++++++++
3 files changed, 185 insertions(+), 18 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java
index 443f2162f85..6d885ab6ca8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java
@@ -18,7 +18,6 @@
package org.apache.doris.nereids.trees.plans.commands.info;
import org.apache.doris.analysis.CopyFromParam;
-import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StageAndPattern;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
@@ -203,23 +202,33 @@ public class CopyFromDesc {
if (exprList == null) {
return false;
}
- List<SlotRef> slotRefs = Lists.newArrayList();
- // Expr.collectList(exprList, SlotRef.class, slotRefs);
+ boolean hasFileColumnPlaceholder = false;
Set<String> columnSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
- for (SlotRef slotRef : slotRefs) {
- String columnName = slotRef.getColumnName();
- if (columnName.startsWith(DOLLAR)) {
- if (fileColumns.size() > 0) {
+ List<Expression> fileColumnExpressions = exprList.stream().map(expr ->
(Expression) expr)
+ .collect(Collectors.toList());
+ fileFilterExpr.ifPresent(fileColumnExpressions::add);
+ for (Expression expr : fileColumnExpressions) {
+ for (UnboundSlot slot :
expr.<UnboundSlot>collectToList(UnboundSlot.class::isInstance)) {
+ String columnName = slot.getName();
+ if (columnName.startsWith(DOLLAR)) {
+ if (!fileColumns.isEmpty()) {
+ throw new AnalysisException("can not mix column name
and dollar sign");
+ }
+ hasFileColumnPlaceholder = true;
+ continue;
+ }
+ if (hasFileColumnPlaceholder) {
throw new AnalysisException("can not mix column name and
dollar sign");
}
- return false;
- }
- if (columnSet.add(columnName)) {
- fileColumns.add(columnName);
+ if (columnSet.add(columnName)) {
+ fileColumns.add(columnName);
+ }
}
}
+ if (hasFileColumnPlaceholder) {
+ return false;
+ }
if (addDeleteSign) {
- // exprList.add(new SlotRef(null, Column.DELETE_SIGN));
fileColumns.add(Column.DELETE_SIGN);
}
return true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
index 1ae2de6eaa7..6fd99573b80 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
@@ -23,6 +23,7 @@ import org.apache.doris.analysis.CopyFromParam;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.Separator;
+import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StageAndPattern;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TupleDescriptor;
@@ -49,6 +50,7 @@ import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.analyzer.Scope;
import org.apache.doris.nereids.analyzer.UnboundRelation;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.glue.translator.ExpressionTranslator;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.nereids.jobs.executor.Analyzer;
@@ -58,6 +60,8 @@ import org.apache.doris.nereids.rules.analysis.BindRelation;
import org.apache.doris.nereids.rules.analysis.ExpressionAnalyzer;
import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
@@ -66,6 +70,8 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.types.StringType;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
@@ -75,6 +81,7 @@ import org.apache.doris.qe.ShowResultSetMetaData;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -234,8 +241,10 @@ public class CopyIntoInfo {
}
PlanTranslatorContext context = new
PlanTranslatorContext(cascadesContext);
List<Slot> slots = boundRelation.getOutput();
- Scope scope = new Scope(slots);
- ExpressionAnalyzer analyzer = new ExpressionAnalyzer(null, scope,
cascadesContext, false, false);
+ CopyIntoFileSlots fileSlots = new CopyIntoFileSlots(slots,
copyFromDesc.getFileColumns(),
+ copyFromDesc.getColumnMappingList());
+ ExpressionAnalyzer analyzer = new ExpressionAnalyzer(null, new
Scope(fileSlots.getScopeSlots()),
+ cascadesContext, false, false);
TupleDescriptor tupleDescriptor = context.generateTupleDesc();
tupleDescriptor.setTable(((OlapScan) boundRelation).getTable());
@@ -248,13 +257,14 @@ public class CopyIntoInfo {
if (copyFromDesc.getColumnMappingList() != null &&
!copyFromDesc.getColumnMappingList().isEmpty()) {
legacyColumnMappingList = new ArrayList<>();
for (Expression expression : copyFromDesc.getColumnMappingList()) {
- legacyColumnMappingList.add(translateToLegacyExpr(expression,
analyzer, context, cascadesContext));
+ legacyColumnMappingList.add(translateToLegacyExpr(expression,
analyzer, context, cascadesContext,
+ fileSlots));
}
}
Expr legacyFileFilterExpr = null;
if (copyFromDesc.getFileFilterExpr().isPresent()) {
legacyFileFilterExpr =
translateToLegacyExpr(copyFromDesc.getFileFilterExpr().get(),
- analyzer, context, cascadesContext);
+ analyzer, context, cascadesContext, fileSlots);
}
String compression = copyIntoProperties.getCompression();
@@ -301,7 +311,7 @@ public class CopyIntoInfo {
}
private Expr translateToLegacyExpr(Expression expr, ExpressionAnalyzer
analyzer, PlanTranslatorContext context,
- CascadesContext cascadesContext) {
+ CascadesContext cascadesContext,
CopyIntoFileSlots fileSlots) {
Expression expression;
try {
expression = analyzer.analyze(expr, new
ExpressionRewriteContext(cascadesContext));
@@ -309,11 +319,26 @@ public class CopyIntoInfo {
throw new
org.apache.doris.nereids.exceptions.AnalysisException("In where clause '"
+ expr.toSql() + "', " +
Utils.convertFirstChar(e.getMessage()));
}
- ExpressionToExpr translator = new ExpressionToExpr();
+ ExpressionToExpr translator = new ExpressionToExpr(fileSlots);
return expression.accept(translator, context);
}
private static class ExpressionToExpr extends ExpressionTranslator {
+ private final CopyIntoFileSlots fileSlots;
+
+ private ExpressionToExpr(CopyIntoFileSlots fileSlots) {
+ this.fileSlots = fileSlots;
+ }
+
+ @Override
+ public Expr visitSlotReference(SlotReference slotReference,
PlanTranslatorContext context) {
+ String fileSlotName =
fileSlots.getFileSlotName(slotReference.getExprId());
+ if (fileSlotName != null) {
+ return new SlotRef(null, fileSlotName);
+ }
+ return super.visitSlotReference(slotReference, context);
+ }
+
@Override
public Expr visitCast(Cast cast, PlanTranslatorContext context) {
// left child of cast is target type, right child of cast is
expression
@@ -322,6 +347,74 @@ public class CopyIntoInfo {
}
}
+ private static class CopyIntoFileSlots {
+ private final List<Slot> scopeSlots;
+ private final Map<ExprId, String> fileSlotNames = Maps.newHashMap();
+
+ private CopyIntoFileSlots(List<Slot> targetSlots, List<String>
fileColumns,
+ List<Expression> columnMappingList) {
+ scopeSlots = new ArrayList<>(targetSlots);
+ if (fileColumns == null) {
+ return;
+ }
+ Map<String, DataType> targetColumnTypes =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+ for (Slot slot : targetSlots) {
+ targetColumnTypes.put(slot.getName(), slot.getDataType());
+ }
+ Map<String, DataType> fileColumnTypes =
inferFileColumnTypes(targetColumnTypes, columnMappingList);
+ for (String fileColumn : fileColumns) {
+ if (!isFileColumnPlaceholder(fileColumn) ||
fileSlotNames.containsValue(fileColumn)) {
+ continue;
+ }
+ SlotReference slot = new SlotReference(fileColumn,
+ fileColumnTypes.getOrDefault(fileColumn,
StringType.INSTANCE), true);
+ scopeSlots.add(slot);
+ fileSlotNames.put(slot.getExprId(), fileColumn);
+ }
+ }
+
+ private List<Slot> getScopeSlots() {
+ return scopeSlots;
+ }
+
+ private String getFileSlotName(ExprId exprId) {
+ return fileSlotNames.get(exprId);
+ }
+
+ private static boolean isFileColumnPlaceholder(String columnName) {
+ return columnName != null && columnName.startsWith("$");
+ }
+
+ private static Map<String, DataType> inferFileColumnTypes(Map<String,
DataType> targetColumnTypes,
+ List<Expression> columnMappingList) {
+ Map<String, DataType> fileColumnTypes = Maps.newHashMap();
+ if (columnMappingList == null) {
+ return fileColumnTypes;
+ }
+ for (Expression expression : columnMappingList) {
+ if (!(expression instanceof EqualTo)) {
+ continue;
+ }
+ EqualTo columnMapping = (EqualTo) expression;
+ if (!(columnMapping.left() instanceof UnboundSlot)) {
+ continue;
+ }
+ DataType targetType = targetColumnTypes.get(((UnboundSlot)
columnMapping.left()).getName());
+ if (targetType == null) {
+ continue;
+ }
+ for (UnboundSlot fileColumn : columnMapping.right()
+ .<UnboundSlot>collect(UnboundSlot.class::isInstance)) {
+ String fileColumnName = fileColumn.getName();
+ if (isFileColumnPlaceholder(fileColumnName)) {
+ fileColumnTypes.putIfAbsent(fileColumnName,
targetType);
+ }
+ }
+ }
+ return fileColumnTypes;
+ }
+ }
+
// after validateStagePB, fileFormat and copyOption is not null
private void validateStagePB(StagePB stagePB) throws AnalysisException {
stageType = stagePB.getType();
diff --git a/regression-test/suites/load_p0/copy_into/test_copy_into.groovy
b/regression-test/suites/load_p0/copy_into/test_copy_into.groovy
index 64e448731ca..6cfd078db7d 100644
--- a/regression-test/suites/load_p0/copy_into/test_copy_into.groovy
+++ b/regression-test/suites/load_p0/copy_into/test_copy_into.groovy
@@ -150,6 +150,71 @@ suite("test_copy_into", "p0") {
}
assertTrue(false, "should not come here")
}
+
+ def csvStageName = "test_copy_into_csv"
+ try_sql """drop stage if exists ${csvStageName}"""
+ sql """
+ create stage if not exists ${csvStageName}
+ properties ('endpoint' = '${getS3Endpoint()}' ,
+ 'region' = '${getS3Region()}' ,
+ 'bucket' = '${getS3BucketName()}' ,
+ 'prefix' = 'regression' ,
+ 'ak' = '${getS3AK()}' ,
+ 'sk' = '${getS3SK()}' ,
+ 'provider' = '${getS3Provider()}',
+ 'access_type' = 'aksk',
+ 'default.file.column_separator' = "|");
+ """
+
+ sql """ DROP TABLE IF EXISTS copy_into_select_placeholder; """
+ sql """
+ CREATE TABLE copy_into_select_placeholder (
+ p_partkey int NOT NULL DEFAULT "1",
+ p_name VARCHAR(55) NOT NULL DEFAULT "2",
+ p_mfgr VARCHAR(25) NOT NULL DEFAULT "3"
+ )ENGINE=OLAP
+ DUPLICATE KEY(`p_partkey`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 3;
+ """
+
+ result = sql """
+ copy into copy_into_select_placeholder
+ from (select \$1, \$2, \$3 from
@${csvStageName}('tpch/sf1/part.csv.split00.gz'))
+ properties ('file.type' = 'csv', 'file.column_separator' = '|',
+ 'file.compression' = 'gz', 'copy.async' = 'false');
+ """
+ logger.info("copy select placeholder result: " + result)
+ assertTrue(result.size() == 1)
+ assertTrue(result[0][1].equals("FINISHED"),
+ "Finish copy into, state=" + result[0][1] + ", expected
state=FINISHED")
+ def selectPlaceholderCount = sql """ SELECT COUNT(*) FROM
copy_into_select_placeholder; """
+ assertTrue((selectPlaceholderCount[0][0] as long) > 0)
+
+ sql """ DROP TABLE IF EXISTS copy_into_filter_placeholder; """
+ sql """
+ CREATE TABLE copy_into_filter_placeholder (
+ p_partkey int NOT NULL DEFAULT "1"
+ )ENGINE=OLAP
+ DUPLICATE KEY(`p_partkey`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 3;
+ """
+
+ result = sql """
+ copy into copy_into_filter_placeholder (p_partkey)
+ from (select 1 from
@${csvStageName}('tpch/sf1/part.csv.split00.gz') where \$1 is not null)
+ properties ('file.type' = 'csv', 'file.column_separator' = '|',
+ 'file.compression' = 'gz', 'copy.async' = 'false');
+ """
+ logger.info("copy filter placeholder result: " + result)
+ assertTrue(result.size() == 1)
+ assertTrue(result[0][1].equals("FINISHED"),
+ "Finish copy into, state=" + result[0][1] + ", expected
state=FINISHED")
+ def filterPlaceholderCount = sql """ SELECT COUNT(*) FROM
copy_into_filter_placeholder; """
+ assertTrue((filterPlaceholderCount[0][0] as long) > 0)
+
+ try_sql """drop stage if exists ${csvStageName}"""
try_sql """drop stage if exists ${externalStageName}"""
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]