This is an automated email from the ASF dual-hosted git repository. starocean999 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bd90fc6f70b [feat](Nereids) support copy into command (#47194) bd90fc6f70b is described below commit bd90fc6f70b9a24911642cdc55da842fd59b2a84 Author: LiBinfeng <libinf...@selectdb.com> AuthorDate: Thu Apr 24 18:25:07 2025 +0800 [feat](Nereids) support copy into command (#47194) --- .../antlr4/org/apache/doris/nereids/DorisParser.g4 | 8 +- .../java/org/apache/doris/analysis/CastExpr.java | 3 + .../org/apache/doris/analysis/CopyFromParam.java | 10 + .../apache/doris/analysis/CopyIntoProperties.java | 2 +- .../org/apache/doris/analysis/CopyProperties.java | 7 +- .../java/org/apache/doris/analysis/CopyStmt.java | 35 +- .../org/apache/doris/analysis/DataDescription.java | 5 +- .../doris/nereids/parser/LogicalPlanBuilder.java | 80 +++++ .../apache/doris/nereids/trees/plans/PlanType.java | 1 + .../trees/plans/commands/CopyIntoCommand.java | 62 ++++ .../trees/plans/commands/info/CopyFromDesc.java} | 139 ++++---- .../trees/plans/commands/info/CopyIntoInfo.java | 358 +++++++++++++++++++++ .../trees/plans/visitor/CommandVisitor.java | 5 + .../main/java/org/apache/doris/qe/DdlExecutor.java | 8 +- .../suites/load_p0/copy_into/test_copy_into.groovy | 15 +- 15 files changed, 661 insertions(+), 77 deletions(-) diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 18f61acc89a..f2eada27362 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -162,6 +162,10 @@ supportedDmlStatement (propertyClause)? (withRemoteStorageSystem)? #export | replayCommand #replay + | COPY INTO selectHint? name=multipartIdentifier columns=identifierList? FROM + (stageAndPattern | (LEFT_PAREN SELECT selectColumnClause + FROM stageAndPattern whereClause? RIGHT_PAREN)) + properties=propertyClause? #copyInto ; supportedCreateStatement @@ -895,10 +899,6 @@ unsupportedUseStatement unsupportedDmlStatement : TRUNCATE TABLE multipartIdentifier specifiedPartition? FORCE? #truncateTable - | COPY INTO name=multipartIdentifier columns=identifierList? FROM - (stageAndPattern | (LEFT_PAREN SELECT selectColumnClause - FROM stageAndPattern whereClause? RIGHT_PAREN)) - properties=propertyClause? #copyInto ; stageAndPattern diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java index aab6a9dbec0..f83cae85b7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java @@ -140,6 +140,9 @@ public class CastExpr extends Expr { if (type.isStructType() && e.type.isStructType()) { getChild(0).setType(type); } + if (type.isScalarType()) { + targetTypeDef = new TypeDef(type); + } analysisDone(); return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyFromParam.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyFromParam.java index b2a57cd1f5d..2158220fde0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyFromParam.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyFromParam.java @@ -67,6 +67,16 @@ public class CopyFromParam { this.fileFilterExpr = whereExpr; } + public CopyFromParam(StageAndPattern stageAndPattern, List<Expr> exprList, Expr fileFilterExpr, + List<String> fileColumns, List<Expr> columnMappingList, List<String> targetColumns) { + this.stageAndPattern = stageAndPattern; + this.exprList = exprList; + this.fileFilterExpr = fileFilterExpr; + this.fileColumns = fileColumns; + this.columnMappingList = columnMappingList; + this.targetColumns = targetColumns; + } + public void analyze(String fullDbName, TableName tableName, boolean useDeleteSign, String fileType) throws AnalysisException { if (exprList == null && fileFilterExpr == null && !useDeleteSign) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyIntoProperties.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyIntoProperties.java index 4291fda7581..792fc6be901 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyIntoProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyIntoProperties.java @@ -84,7 +84,7 @@ public class CopyIntoProperties extends CopyProperties { return result; } - protected void mergeProperties(StageProperties stageProperties) { + public void mergeProperties(StageProperties stageProperties) { Map<String, String> properties = stageProperties.getDefaultPropertiesWithoutPrefix(); for (Entry<String, String> entry : properties.entrySet()) { if (!this.properties.containsKey(entry.getKey())) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyProperties.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyProperties.java index f6807b1bf79..d70976d3f3b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyProperties.java @@ -22,6 +22,7 @@ import org.apache.doris.common.util.PrintableMap; import org.apache.commons.lang3.StringUtils; +import java.util.HashMap; import java.util.Map; public class CopyProperties { @@ -57,7 +58,11 @@ public class CopyProperties { public static final String USE_DELETE_SIGN = COPY_PREFIX + "use_delete_sign"; public CopyProperties(Map<String, String> properties, String prefix) { - this.properties = properties; + Map<String, String> newProperties = new HashMap<>(); + for (String key : properties.keySet()) { + newProperties.put(key, properties.get(key)); + } + this.properties = newProperties; this.prefix = prefix; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java index 6bd4d3506d1..3a916f18640 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java @@ -36,6 +36,7 @@ import org.apache.doris.datasource.property.constants.BosProperties; import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.load.loadv2.LoadTask.MergeType; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.ShowResultSetMetaData; @@ -80,8 +81,8 @@ public class CopyStmt extends DdlStmt implements NotFallbackInParser { private LabelName label = null; private BrokerDesc brokerDesc = null; private DataDescription dataDescription = null; - private final Map<String, String> brokerProperties = new HashMap<>(); - private final Map<String, String> properties = new HashMap<>(); + private Map<String, String> brokerProperties = new HashMap<>(); + private Map<String, String> properties = new HashMap<>(); @Getter private String stage; @@ -110,6 +111,36 @@ public class CopyStmt extends DdlStmt implements NotFallbackInParser { } } + /** + * Use for Nereids Planner. + */ + public CopyStmt(TableName tableName, CopyFromParam copyFromParam, + CopyIntoProperties copyProperties, Map<String, Map<String, String>> optHints, LabelName label, + String stageId, StageType stageType, String stagePrefix, ObjectInfo objectInfo, String userName, + Map<String, String> brokerProperties, Map<String, String> properties, + DataDescription dataDescription, BrokerDesc brokerDesc, OriginStatement originStmt) { + this.tableName = tableName; + this.copyFromParam = copyFromParam; + this.stage = copyFromParam.getStageAndPattern().getStageName(); + this.copyIntoProperties = copyProperties; + if (optHints != null) { + this.optHints = optHints.get(SET_VAR_KEY); + } + + this.label = label; + this.brokerDesc = brokerDesc; + this.brokerProperties = brokerProperties; + this.properties = properties; + + this.stageId = stageId; + this.stageType = stageType; + this.stagePrefix = stagePrefix; + this.objectInfo = objectInfo; + this.userName = userName; + this.dataDescription = dataDescription; + this.setOrigStmt(originStmt); + } + @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index 6776eecc1c1..bdfed0814a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -845,7 +845,10 @@ public class DataDescription implements InsertStmt.DataDesc { + "The mapping operator error, op: " + predicate.getOp()); } Expr child0 = predicate.getChild(0); - if (!(child0 instanceof SlotRef)) { + if (child0 instanceof CastExpr && child0.getChild(0) instanceof SlotRef) { + predicate.setChild(0, child0.getChild(0)); + child0 = predicate.getChild(0); + } else if (!(child0 instanceof SlotRef)) { throw new AnalysisException("Mapping function expr only support the column or eq binary predicate. " + "The mapping column error. column: " + child0.toSql()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index d54ee98b0cf..85a443ae344 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -29,6 +29,7 @@ import org.apache.doris.analysis.FunctionName; import org.apache.doris.analysis.PassVar; import org.apache.doris.analysis.PasswordOptions; import org.apache.doris.analysis.SetType; +import org.apache.doris.analysis.StageAndPattern; import org.apache.doris.analysis.StorageBackend; import org.apache.doris.analysis.TableName; import org.apache.doris.analysis.TableScanParams; @@ -561,6 +562,7 @@ import org.apache.doris.nereids.trees.plans.commands.CancelWarmUpJobCommand; import org.apache.doris.nereids.trees.plans.commands.CleanAllProfileCommand; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.Constraint; +import org.apache.doris.nereids.trees.plans.commands.CopyIntoCommand; import org.apache.doris.nereids.trees.plans.commands.CreateCatalogCommand; import org.apache.doris.nereids.trees.plans.commands.CreateEncryptkeyCommand; import org.apache.doris.nereids.trees.plans.commands.CreateFileCommand; @@ -740,6 +742,8 @@ import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc; import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc; import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo; import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition; +import org.apache.doris.nereids.trees.plans.commands.info.CopyFromDesc; +import org.apache.doris.nereids.trees.plans.commands.info.CopyIntoInfo; import org.apache.doris.nereids.trees.plans.commands.info.CreateIndexOp; import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo; import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo; @@ -1012,6 +1016,82 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { return new CancelJobTaskCommand(jobName, taskId); } + private StageAndPattern getStageAndPattern(DorisParser.StageAndPatternContext ctx) { + if (ctx.pattern != null) { + return new StageAndPattern(stripQuotes(ctx.stage.getText()), stripQuotes(ctx.pattern.getText())); + } else { + return new StageAndPattern(stripQuotes(ctx.stage.getText()), null); + } + } + + @Override + public LogicalPlan visitCopyInto(DorisParser.CopyIntoContext ctx) { + ImmutableList.Builder<String> tableName = ImmutableList.builder(); + if (null != ctx.name) { + List<String> nameParts = visitMultipartIdentifier(ctx.name); + tableName.addAll(nameParts); + } + List<String> columns = (null != ctx.columns) ? visitIdentifierList(ctx.columns) : null; + StageAndPattern stageAndPattern = getStageAndPattern(ctx.stageAndPattern()); + CopyFromDesc copyFromDesc = null; + if (null != ctx.SELECT()) { + List<NamedExpression> projects = getNamedExpressions(ctx.selectColumnClause().namedExpressionSeq()); + Optional<Expression> where = Optional.empty(); + if (ctx.whereClause() != null) { + where = Optional.of(getExpression(ctx.whereClause().booleanExpression())); + } + copyFromDesc = new CopyFromDesc(stageAndPattern, projects, where); + } else { + copyFromDesc = new CopyFromDesc(stageAndPattern); + } + Map<String, String> properties = visitPropertyClause(ctx.properties); + copyFromDesc.setTargetColumns(columns); + CopyIntoInfo copyInfoInfo = null; + if (null != ctx.selectHint()) { + if ((selectHintMap == null) || selectHintMap.isEmpty()) { + throw new AnalysisException("hint should be in right place: " + ctx.getText()); + } + List<ParserRuleContext> selectHintContexts = Lists.newArrayList(); + for (Integer key : selectHintMap.keySet()) { + if (key > ctx.getStart().getStopIndex() && key < ctx.getStop().getStartIndex()) { + selectHintContexts.add(selectHintMap.get(key)); + } + } + if (selectHintContexts.size() != 1) { + throw new AnalysisException("only one hint is allowed in: " + ctx.getText()); + } + SelectHintContext selectHintContext = (SelectHintContext) selectHintContexts.get(0); + Map<String, String> parameterNames = Maps.newLinkedHashMap(); + for (HintStatementContext hintStatement : selectHintContext.hintStatements) { + String hintName = hintStatement.hintName.getText().toLowerCase(Locale.ROOT); + if (!hintName.equalsIgnoreCase("set_var")) { + throw new AnalysisException("only set_var hint is allowed in: " + ctx.getText()); + } + for (HintAssignmentContext kv : hintStatement.parameters) { + if (kv.key != null) { + String parameterName = visitIdentifierOrText(kv.key); + Optional<String> value = Optional.empty(); + if (kv.constantValue != null) { + Literal literal = (Literal) visit(kv.constantValue); + value = Optional.ofNullable(literal.toLegacyLiteral().getStringValue()); + } else if (kv.identifierValue != null) { + // maybe we should throw exception when the identifierValue is quoted identifier + value = Optional.ofNullable(kv.identifierValue.getText()); + } + parameterNames.put(parameterName, value.get()); + } + } + } + Map<String, Map<String, String>> setVarHint = Maps.newLinkedHashMap(); + setVarHint.put("set_var", parameterNames); + copyInfoInfo = new CopyIntoInfo(tableName.build(), copyFromDesc, properties, setVarHint); + } else { + copyInfoInfo = new CopyIntoInfo(tableName.build(), copyFromDesc, properties, null); + } + + return new CopyIntoCommand(copyInfoInfo); + } + @Override public String visitCommentSpec(DorisParser.CommentSpecContext ctx) { String commentSpec = ctx == null ? "''" : ctx.STRING_LITERAL().getText(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 1ab6d5fd1e1..0d60298421a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -135,6 +135,7 @@ public enum PlanType { // commands ADMIN_CHECK_TABLETS_COMMAND, + COPY_INTO_COMMAND, CREATE_POLICY_COMMAND, CREATE_TABLE_COMMAND, CREATE_SQL_BLOCK_RULE_COMMAND, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CopyIntoCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CopyIntoCommand.java new file mode 100644 index 00000000000..9fac32c7ad9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CopyIntoCommand.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.analysis.CopyStmt; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.info.CopyIntoInfo; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.DdlExecutor; +import org.apache.doris.qe.StmtExecutor; + +/** + * copy into command + */ +public class CopyIntoCommand extends Command implements ForwardWithSync { + + CopyIntoInfo copyIntoInfo; + + /** + * Use for copy into command. + */ + public CopyIntoCommand(CopyIntoInfo info) { + super(PlanType.COPY_INTO_COMMAND); + this.copyIntoInfo = info; + } + + @Override + public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + copyIntoInfo.validate(ctx); + CopyStmt copyStmt = copyIntoInfo.toLegacyStatement(executor.getOriginStmt()); + DdlExecutor.executeCopyStmt(ctx.getEnv(), copyStmt); + // copy into used + if (executor.getContext().getState().getResultSet() != null) { + if (executor.isProxy()) { + executor.setProxyShowResultSet(executor.getContext().getState().getResultSet()); + return; + } + executor.sendResultSet(executor.getContext().getState().getResultSet()); + } + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitCopyIntoCommand(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyFromParam.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java similarity index 70% copy from fe/fe-core/src/main/java/org/apache/doris/analysis/CopyFromParam.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java index b2a57cd1f5d..3ba1ec67b06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyFromParam.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java @@ -15,9 +15,12 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.analysis; +package org.apache.doris.nereids.trees.plans.commands.info; -import org.apache.doris.analysis.BinaryPredicate.Operator; +import org.apache.doris.analysis.CopyFromParam; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StageAndPattern; +import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; @@ -26,48 +29,81 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.nereids.analyzer.UnboundSlot; +import org.apache.doris.nereids.analyzer.UnboundStar; +import org.apache.doris.nereids.trees.expressions.EqualTo; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.util.ExpressionUtils; -import com.google.common.base.Function; -import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import lombok.Getter; -import lombok.Setter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; -public class CopyFromParam { +/** + * copy from desc ==> copy from param + */ +public class CopyFromDesc { private static final Logger LOG = LogManager.getLogger(CopyFromParam.class); private static final String DOLLAR = "$"; - - @Getter private StageAndPattern stageAndPattern; - @Getter - private List<Expr> exprList; - @Getter - private Expr fileFilterExpr; - @Getter + private List<NamedExpression> exprList; + private Optional<Expression> fileFilterExpr; private List<String> fileColumns; - @Getter - private List<Expr> columnMappingList; - @Setter + private List<Expression> columnMappingList; private List<String> targetColumns; - public CopyFromParam(StageAndPattern stageAndPattern) { + public CopyFromDesc(StageAndPattern stageAndPattern) { this.stageAndPattern = stageAndPattern; } - public CopyFromParam(StageAndPattern stageAndPattern, List<Expr> exprList, Expr whereExpr) { + public CopyFromDesc(StageAndPattern stageAndPattern, List<NamedExpression> exprList, + Optional<Expression> whereExpr) { this.stageAndPattern = stageAndPattern; this.exprList = exprList; this.fileFilterExpr = whereExpr; } - public void analyze(String fullDbName, TableName tableName, boolean useDeleteSign, String fileType) + public void setTargetColumns(List<String> targetColumns) { + this.targetColumns = targetColumns; + } + + public StageAndPattern getStageAndPattern() { + return stageAndPattern; + } + + public List<Expression> getColumnMappingList() { + return columnMappingList; + } + + public List<NamedExpression> getExprList() { + return exprList; + } + + public List<String> getFileColumns() { + return fileColumns; + } + + public Optional<Expression> getFileFilterExpr() { + return fileFilterExpr; + } + + public List<String> getTargetColumns() { + return targetColumns; + } + + /** + * analyze + */ + public void validate(String fullDbName, TableName tableName, boolean useDeleteSign, String fileType) throws AnalysisException { if (exprList == null && fileFilterExpr == null && !useDeleteSign) { return; @@ -136,14 +172,15 @@ public class CopyFromParam { parseColumnNames(fileType, fileColumns); if (exprList != null) { - if (targetColumns.size() != exprList.size()) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_VALUE_COUNT); - } - for (int i = 0; i < targetColumns.size(); i++) { - Expr expr = exprList.get(i); - BinaryPredicate binaryPredicate = new BinaryPredicate(Operator.EQ, - new SlotRef(null, targetColumns.get(i)), expr); - columnMappingList.add(binaryPredicate); + if (!(exprList.size() == 1 && exprList.get(0) instanceof UnboundStar)) { + if (targetColumns.size() != exprList.size()) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_VALUE_COUNT); + } + for (int i = 0; i < targetColumns.size(); i++) { + Expression expr = exprList.get(i); + EqualTo binaryPredicate = new EqualTo(new UnboundSlot(targetColumns.get(i)), expr); + columnMappingList.add(binaryPredicate); + } } } else { for (int i = 0; i < targetColumns.size(); i++) { @@ -152,9 +189,8 @@ public class CopyFromParam { // mode. Because if the src data is an expr, strict mode judgment will // not be performed. if (!fileColumns.get(i).equalsIgnoreCase(targetColumns.get(i))) { - BinaryPredicate binaryPredicate = new BinaryPredicate(Operator.EQ, - new SlotRef(null, targetColumns.get(i)), - new SlotRef(null, fileColumns.get(i))); + EqualTo binaryPredicate = new EqualTo(new UnboundSlot(targetColumns.get(i)), + new UnboundSlot(fileColumns.get(i))); columnMappingList.add(binaryPredicate); } } @@ -167,7 +203,7 @@ public class CopyFromParam { return false; } List<SlotRef> slotRefs = Lists.newArrayList(); - Expr.collectList(exprList, SlotRef.class, slotRefs); + // Expr.collectList(exprList, SlotRef.class, slotRefs); Set<String> columnSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); for (SlotRef slotRef : slotRefs) { String columnName = slotRef.getColumnName(); @@ -182,7 +218,7 @@ public class CopyFromParam { } } if (addDeleteSign) { - exprList.add(new SlotRef(null, Column.DELETE_SIGN)); + // exprList.add(new SlotRef(null, Column.DELETE_SIGN)); fileColumns.add(Column.DELETE_SIGN); } return true; @@ -200,29 +236,29 @@ public class CopyFromParam { private int getMaxFileColumnId() throws AnalysisException { int maxId = 0; if (exprList != null) { - int maxFileColumnId = getMaxFileColumnId(exprList); + int maxFileColumnId = getMaxFileFilterColumnId( + exprList.stream().map(expr -> (Expression) expr).collect(Collectors.toList())); maxId = maxId > maxFileColumnId ? maxId : maxFileColumnId; } if (fileFilterExpr != null) { - int maxFileColumnId = getMaxFileColumnId(Lists.newArrayList(fileFilterExpr)); + int maxFileColumnId = getMaxFileFilterColumnId(Lists.newArrayList(fileFilterExpr.get())); maxId = maxId > maxFileColumnId ? maxId : maxFileColumnId; } return maxId; } - private int getMaxFileColumnId(List<Expr> exprList) throws AnalysisException { - List<SlotRef> slotRefs = Lists.newArrayList(); - Expr.collectList(exprList, SlotRef.class, slotRefs); + private int getMaxFileFilterColumnId(List<Expression> exprList) throws AnalysisException { + Set<Slot> slots = ExpressionUtils.getInputSlotSet(exprList); int maxId = 0; - for (SlotRef slotRef : slotRefs) { - int fileColumnId = getFileColumnIdOfSlotRef(slotRef); + for (Slot slot : slots) { + int fileColumnId = getFileColumnIdOfSlotRef((UnboundSlot) slot); maxId = fileColumnId < maxId ? maxId : fileColumnId; } return maxId; } - private int getFileColumnIdOfSlotRef(SlotRef slotRef) throws AnalysisException { - String columnName = slotRef.getColumnName(); + private int getFileColumnIdOfSlotRef(UnboundSlot unboundSlot) throws AnalysisException { + String columnName = unboundSlot.getName(); try { if (!columnName.startsWith(DOLLAR)) { throw new AnalysisException("can not mix column name and dollar sign"); @@ -245,23 +281,4 @@ public class CopyFromParam { } } } - - public String toSql() { - StringBuilder sb = new StringBuilder(); - if (columnMappingList != null || fileFilterExpr != null) { - sb.append("(SELECT "); - if (columnMappingList != null) { - Joiner.on(", ").appendTo(sb, - Lists.transform(columnMappingList, (Function<Expr, Object>) expr -> expr.toSql())); - } - sb.append(" FROM ").append(stageAndPattern.toSql()); - if (fileFilterExpr != null) { - sb.append(" WHERE ").append(fileFilterExpr.toSql()); - } - sb.append(")"); - } else { - sb.append(stageAndPattern.toSql()); - } - return sb.toString(); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java new file mode 100644 index 00000000000..bb4dde7143e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java @@ -0,0 +1,358 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.info; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.CastExpr; +import org.apache.doris.analysis.CopyFromParam; +import org.apache.doris.analysis.CopyIntoProperties; +import org.apache.doris.analysis.CopyStmt; +import org.apache.doris.analysis.DataDescription; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.LabelName; +import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.analysis.Separator; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StageAndPattern; +import org.apache.doris.analysis.StageProperties; +import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.analysis.TableName; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.catalog.CloudEnv; +import org.apache.doris.cloud.proto.Cloud.ObjectStoreInfoPB; +import org.apache.doris.cloud.proto.Cloud.StagePB; +import org.apache.doris.cloud.proto.Cloud.StagePB.StageType; +import org.apache.doris.cloud.stage.StageUtil; +import org.apache.doris.cloud.storage.RemoteBase; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.datasource.property.constants.BosProperties; +import org.apache.doris.datasource.property.constants.S3Properties; +import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.analyzer.Scope; +import org.apache.doris.nereids.analyzer.UnboundRelation; +import org.apache.doris.nereids.glue.translator.ExpressionTranslator; +import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.nereids.jobs.executor.Rewriter; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.rules.analysis.BindRelation; +import org.apache.doris.nereids.rules.analysis.ExpressionAnalyzer; +import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext; +import org.apache.doris.nereids.trees.expressions.Cast; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.OlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.OriginStatement; +import org.apache.doris.qe.SessionVariable; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * copy into informations + */ +public class CopyIntoInfo { + private static final Logger LOG = LogManager.getLogger(CopyIntoInfo.class); + + private static final String S3_BUCKET = "bucket"; + private static final String S3_PREFIX = "prefix"; + + private final List<String> nameParts; + private CopyFromDesc copyFromDesc; + private CopyFromParam legacyCopyFromParam; + private CopyIntoProperties copyIntoProperties; + private Map<String, Map<String, String>> optHints; + + private LabelName label = null; + private BrokerDesc brokerDesc = null; + private DataDescription dataDescription = null; + private final Map<String, String> brokerProperties = new HashMap<>(); + private Map<String, String> properties = new HashMap<>(); + + private String stage; + private String stageId; + private StageType stageType; + private String stagePrefix; + private RemoteBase.ObjectInfo objectInfo; + private String userName; + private TableName tableName; + + /** + * copy into informations + */ + public CopyIntoInfo(List<String> nameParts, CopyFromDesc copyFromDesc, + Map<String, String> properties, Map<String, Map<String, String>> optHints) { + this.nameParts = nameParts; + this.copyFromDesc = copyFromDesc; + Map<String, String> newProperties = new HashMap<>(); + for (String key : properties.keySet()) { + newProperties.put(key, properties.get(key)); + } + this.copyIntoProperties = new CopyIntoProperties(newProperties); + this.optHints = optHints; + this.stage = copyFromDesc.getStageAndPattern().getStageName(); + } + + /** + * validate copy into information + */ + public void validate(ConnectContext ctx) throws DdlException, AnalysisException { + if (this.optHints != null && this.optHints.containsKey(SessionVariable.CLOUD_CLUSTER)) { + ((CloudEnv) Env.getCurrentEnv()) + .checkCloudClusterPriv(this.optHints.get("set_var").get(SessionVariable.CLOUD_CLUSTER)); + } + // generate a label + String labelName = "copy_" + DebugUtil.printId(ctx.queryId()).replace("-", "_"); + String ctl = null; + String db = null; + String table = null; + switch (nameParts.size()) { + case 1: { // table + ctl = ctx.getDefaultCatalog(); + if (Strings.isNullOrEmpty(ctl)) { + ctl = InternalCatalog.INTERNAL_CATALOG_NAME; + } + db = ctx.getDatabase(); + if (Strings.isNullOrEmpty(db)) { + throw new AnalysisException("Please specify a database name."); + } + table = nameParts.get(0); + break; + } + case 2: + // db.table + // Use database name from table name parts. + break; + case 3: { + // catalog.db.table + ctl = nameParts.get(0); + db = nameParts.get(1); + table = nameParts.get(2); + break; + } + default: + throw new IllegalStateException("Table name [" + nameParts + "] is invalid."); + } + tableName = new TableName(ctl, db, table); + label = new LabelName(tableName.getDb(), labelName); + if (stage.isEmpty()) { + throw new AnalysisException("Stage name can not be empty"); + } + this.userName = ClusterNamespace.getNameFromFullName(ctx.getCurrentUserIdentity().getQualifiedUser()); + doValidate(userName, db, true); + } + + /** + * do validate + */ + public void doValidate(String user, String db, boolean checkAuth) throws AnalysisException, DdlException { + // get stage from meta service + StagePB stagePB = StageUtil.getStage(stage, userName, true); + validateStagePB(stagePB); + // generate broker desc + brokerDesc = new BrokerDesc("S3", StorageBackend.StorageType.S3, brokerProperties); + // generate data description + String filePath = "s3://" + brokerProperties.get(S3_BUCKET) + "/" + brokerProperties.get(S3_PREFIX); + Separator separator = copyIntoProperties.getColumnSeparator() != null ? new Separator( + copyIntoProperties.getColumnSeparator()) : null; + String fileFormatStr = copyIntoProperties.getFileType(); + Map<String, String> dataDescProperties = copyIntoProperties.getDataDescriptionProperties(); + copyFromDesc.validate(db, tableName, this.copyIntoProperties.useDeleteSign(), + copyIntoProperties.getFileTypeIgnoreCompression()); + if (LOG.isDebugEnabled()) { + LOG.debug("copy into params. sql: {}, fileColumns: {}, columnMappingList: {}, filter: {}", + copyFromDesc.getFileColumns().toString(), copyFromDesc.getColumnMappingList().toString(), + copyFromDesc.getFileFilterExpr().toString()); + } + + List<String> nameParts = Lists.newArrayList(); + nameParts.add(db); + nameParts.add(tableName.getTbl()); + Plan unboundRelation = new UnboundRelation(StatementScopeIdGenerator.newRelationId(), nameParts); + CascadesContext cascadesContext = CascadesContext.initContext(ConnectContext.get().getStatementContext(), + unboundRelation, PhysicalProperties.ANY); + Rewriter.getWholeTreeRewriterWithCustomJobs(cascadesContext, + ImmutableList.of(Rewriter.bottomUp(new BindRelation()))).execute(); + Plan boundRelation = cascadesContext.getRewritePlan(); + // table could have delete sign in LogicalFilter above + if (cascadesContext.getRewritePlan() instanceof LogicalFilter) { + boundRelation = (Plan) ((LogicalFilter) cascadesContext.getRewritePlan()).child(); + } + PlanTranslatorContext context = new PlanTranslatorContext(cascadesContext); + List<Slot> slots = boundRelation.getOutput(); + Scope scope = new Scope(slots); + ExpressionAnalyzer analyzer = new ExpressionAnalyzer(null, scope, cascadesContext, false, false); + + Map<SlotReference, SlotRef> translateMap = Maps.newHashMap(); + + TupleDescriptor tupleDescriptor = context.generateTupleDesc(); + tupleDescriptor.setTable(((OlapScan) boundRelation).getTable()); + for (int i = 0; i < boundRelation.getOutput().size(); i++) { + SlotReference slotReference = (SlotReference) boundRelation.getOutput().get(i); + SlotRef slotRef = new SlotRef(null, slotReference.getName()); + translateMap.put(slotReference, slotRef); + context.createSlotDesc(tupleDescriptor, slotReference, ((OlapScan) boundRelation).getTable()); + } + + List<Expr> legacyColumnMappingList = null; + if (copyFromDesc.getColumnMappingList() != null && !copyFromDesc.getColumnMappingList().isEmpty()) { + legacyColumnMappingList = new ArrayList<>(); + for (Expression expression : copyFromDesc.getColumnMappingList()) { + legacyColumnMappingList.add(translateToLegacyExpr(expression, analyzer, context, cascadesContext)); + } + } + Expr legacyFileFilterExpr = null; + if (copyFromDesc.getFileFilterExpr().isPresent()) { + legacyFileFilterExpr = translateToLegacyExpr(copyFromDesc.getFileFilterExpr().get(), + analyzer, context, cascadesContext); + } + + dataDescription = new DataDescription(tableName.getTbl(), null, Lists.newArrayList(filePath), + copyFromDesc.getFileColumns(), separator, fileFormatStr, null, false, + legacyColumnMappingList, legacyFileFilterExpr, null, LoadTask.MergeType.APPEND, null, + null, dataDescProperties); + dataDescription.setCompressType(StageUtil.parseCompressType(copyIntoProperties.getCompression())); + if (!(copyFromDesc.getColumnMappingList() == null + || copyFromDesc.getColumnMappingList().isEmpty())) { + dataDescription.setIgnoreCsvRedundantCol(true); + } + // analyze data description + if (checkAuth) { + dataDescription.analyze(db); + } else { + dataDescription.analyzeWithoutCheckPriv(db); + } + String path; + for (int i = 0; i < dataDescription.getFilePaths().size(); i++) { + path = dataDescription.getFilePaths().get(i); + dataDescription.getFilePaths().set(i, BosProperties.convertPathToS3(path)); + StorageBackend.checkPath(path, brokerDesc.getStorageType(), null); + dataDescription.getFilePaths().set(i, path); + } + + try { + properties.putAll(copyIntoProperties.getExecProperties()); + // TODO support exec params as LoadStmt + LoadStmt.checkProperties(properties); + } catch (DdlException e) { + throw new AnalysisException(e.getMessage()); + } + + // translate copy from description to copy from param + legacyCopyFromParam = toLegacyParam(copyFromDesc, analyzer, context, cascadesContext); + } + + private CopyFromParam toLegacyParam(CopyFromDesc copyFromDesc, ExpressionAnalyzer analyzer, + PlanTranslatorContext context, CascadesContext cascadesContext) { + StageAndPattern stageAndPattern = copyFromDesc.getStageAndPattern(); + List<Expr> exprList = null; + if (copyFromDesc.getExprList() != null) { + exprList = new ArrayList<>(); + for (Expression expression : copyFromDesc.getExprList()) { + exprList.add(translateToLegacyExpr(expression, analyzer, context, cascadesContext)); + } + } + Expr fileFilterExpr = null; + if (copyFromDesc.getFileFilterExpr().isPresent()) { + fileFilterExpr = translateToLegacyExpr(copyFromDesc.getFileFilterExpr().get(), + analyzer, context, cascadesContext); + } + List<String> fileColumns = copyFromDesc.getFileColumns(); + List<Expr> columnMappingList = null; + if (copyFromDesc.getColumnMappingList() != null) { + columnMappingList = new ArrayList<>(); + for (Expression expression : copyFromDesc.getColumnMappingList()) { + columnMappingList.add(translateToLegacyExpr(expression, analyzer, context, cascadesContext)); + } + } + List<String> targetColumns = copyFromDesc.getTargetColumns(); + return new CopyFromParam(stageAndPattern, exprList, fileFilterExpr, fileColumns, columnMappingList, + targetColumns); + } + + private Expr translateToLegacyExpr(Expression expr, ExpressionAnalyzer analyzer, PlanTranslatorContext context, + CascadesContext cascadesContext) { + Expression expression; + try { + expression = analyzer.analyze(expr, new ExpressionRewriteContext(cascadesContext)); + } catch (org.apache.doris.nereids.exceptions.AnalysisException e) { + throw new org.apache.doris.nereids.exceptions.AnalysisException("In where clause '" + + expr.toSql() + "', " + + Utils.convertFirstChar(e.getMessage())); + } + ExpressionToExpr translator = new ExpressionToExpr(); + return expression.accept(translator, context); + } + + private static class ExpressionToExpr extends ExpressionTranslator { + @Override + public Expr visitCast(Cast cast, PlanTranslatorContext context) { + // left child of cast is target type, right child of cast is expression + return new CastExpr(cast.getDataType().toCatalogDataType(), + cast.child().accept(this, context), null); + } + } + + // after validateStagePB, fileFormat and copyOption is not null + private void validateStagePB(StagePB stagePB) throws AnalysisException { + stageType = stagePB.getType(); + stageId = stagePB.getStageId(); + ObjectStoreInfoPB objInfo = stagePB.getObjInfo(); + stagePrefix = objInfo.getPrefix(); + objectInfo = RemoteBase.analyzeStageObjectStoreInfo(stagePB); + brokerProperties.put(S3Properties.Env.ENDPOINT, objInfo.getEndpoint()); + brokerProperties.put(S3Properties.Env.REGION, objInfo.getRegion()); + brokerProperties.put(S3Properties.Env.ACCESS_KEY, objectInfo.getAk()); + brokerProperties.put(S3Properties.Env.SECRET_KEY, objectInfo.getSk()); + if (objectInfo.getToken() != null) { + brokerProperties.put(S3Properties.Env.TOKEN, objectInfo.getToken()); + } + brokerProperties.put(S3_BUCKET, objInfo.getBucket()); + brokerProperties.put(S3_PREFIX, objInfo.getPrefix()); + // S3 Provider properties should be case insensitive. + brokerProperties.put(S3Properties.PROVIDER, objInfo.getProvider().toString().toUpperCase()); + StageProperties stageProperties = new StageProperties(stagePB.getPropertiesMap()); + this.copyIntoProperties.mergeProperties(stageProperties); + this.copyIntoProperties.analyze(); + } + + public CopyStmt toLegacyStatement(OriginStatement originStmt) { + return new CopyStmt(tableName, legacyCopyFromParam, copyIntoProperties, optHints, label, stageId, stageType, + stagePrefix, objectInfo, userName, brokerProperties, properties, dataDescription, brokerDesc, originStmt); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java index bbb197b613e..a0d55d59777 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java @@ -48,6 +48,7 @@ import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand; import org.apache.doris.nereids.trees.plans.commands.CancelWarmUpJobCommand; import org.apache.doris.nereids.trees.plans.commands.CleanAllProfileCommand; import org.apache.doris.nereids.trees.plans.commands.Command; +import org.apache.doris.nereids.trees.plans.commands.CopyIntoCommand; import org.apache.doris.nereids.trees.plans.commands.CreateCatalogCommand; import org.apache.doris.nereids.trees.plans.commands.CreateEncryptkeyCommand; import org.apache.doris.nereids.trees.plans.commands.CreateFileCommand; @@ -263,6 +264,10 @@ public interface CommandVisitor<R, C> { return visitCommand(exportCommand, context); } + default R visitCopyIntoCommand(CopyIntoCommand copyIntoCommand, C context) { + return visitCommand(copyIntoCommand, context); + } + default R visitCreateEncryptKeyCommand(CreateEncryptkeyCommand createEncryptKeyCommand, C context) { return visitCommand(createEncryptKeyCommand, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 68fa34071f6..6d3475dbebe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -454,7 +454,7 @@ public class DdlExecutor { } } - private static void executeCopyStmt(Env env, CopyStmt copyStmt) throws Exception { + public static void executeCopyStmt(Env env, CopyStmt copyStmt) throws Exception { CopyJob job = (CopyJob) (((CloudLoadManager) env.getLoadManager()).createLoadJobFromStmt(copyStmt)); if (!copyStmt.isAsync()) { // wait for execute finished @@ -475,7 +475,7 @@ public class DdlExecutor { entry.add(loadingStatus.getTrackingUrl()); result.add(entry); queryState.setResultSet(new ShowResultSet(copyStmt.getMetaData(), result)); - copyStmt.getAnalyzer().getContext().setState(queryState); + ConnectContext.get().setState(queryState); return; } else if (job.getState() == JobState.FINISHED) { EtlStatus loadingStatus = job.getLoadingStatus(); @@ -493,7 +493,7 @@ public class DdlExecutor { entry.add(loadingStatus.getTrackingUrl()); result.add(entry); queryState.setResultSet(new ShowResultSet(copyStmt.getMetaData(), result)); - copyStmt.getAnalyzer().getContext().setState(queryState); + ConnectContext.get().setState(queryState); return; } } @@ -510,7 +510,7 @@ public class DdlExecutor { entry.add(""); result.add(entry); queryState.setResultSet(new ShowResultSet(copyStmt.getMetaData(), result)); - copyStmt.getAnalyzer().getContext().setState(queryState); + ConnectContext.get().setState(queryState); } private static void waitJobCompleted(CopyJob job) throws InterruptedException { diff --git a/regression-test/suites/load_p0/copy_into/test_copy_into.groovy b/regression-test/suites/load_p0/copy_into/test_copy_into.groovy index bd477d99bf6..ff1cb7bdb83 100644 --- a/regression-test/suites/load_p0/copy_into/test_copy_into.groovy +++ b/regression-test/suites/load_p0/copy_into/test_copy_into.groovy @@ -103,10 +103,10 @@ suite("test_copy_into", "p0") { def errorMsgs = [ "", - "quality not good enough to cancel", + "errCode = 2, detailMessage = In where clause '(p_type = not_exist)', unknown column 'not_exist' in 'table list", "", "", - "quality not good enough to cancel", + "errCode = 2, detailMessage = In where clause '(p_type = not_exist)', unknown column 'not_exist' in 'table list", "", "", "", @@ -127,8 +127,17 @@ suite("test_copy_into", "p0") { for (int i = 0; i < tartgetColumnsList.size(); i++) { sql "$dropTable" sql "$createTable" + if (i == 1 || i == 4) { + try { + result = do_copy_into.call(tableName, tartgetColumnsList[i], selectColumnsList[i], + externalStageName, filePrefix, whereExprs[i]) + } catch (Exception e) { + assertEquals(errorMsgs[i], e.getMessage()) + } + continue; + } result = do_copy_into.call(tableName, tartgetColumnsList[i], selectColumnsList[i], - externalStageName, filePrefix, whereExprs[i]) + externalStageName, filePrefix, whereExprs[i]) logger.info("i: " + i + ", copy result: " + result) assertTrue(result.size() == 1) if (result[0][1].equals("FINISHED")) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org