This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new cdba4936b48 [feature](nereids) Support group commit insert (#26075) cdba4936b48 is described below commit cdba4936b48b4951d670649a68fef363c9cfab25 Author: 赵硕 <1443539...@qq.com> AuthorDate: Fri Nov 10 14:20:14 2023 +0800 [feature](nereids) Support group commit insert (#26075) --- .../apache/doris/analysis/NativeInsertStmt.java | 57 +-- .../doris/nereids/jobs/executor/Rewriter.java | 4 - .../org/apache/doris/nereids/rules/RuleType.java | 2 - .../rules/analysis/RejectGroupCommitInsert.java | 53 --- .../plans/commands/InsertIntoTableCommand.java | 78 +++- .../apache/doris/planner/GroupCommitPlanner.java | 196 ++++++++++ .../java/org/apache/doris/qe/StmtExecutor.java | 41 +-- .../data/insert_p0/insert_group_commit_into.out | 95 +++++ .../insert_p0/insert_group_commit_into.groovy | 348 +++++++++--------- .../insert_group_commit_with_exception.groovy | 402 +++++++++++---------- .../insert_group_commit_with_large_data.groovy | 89 ++--- 11 files changed, 836 insertions(+), 529 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index d4802ea956c..27e4c6bf36c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -50,24 +50,14 @@ import org.apache.doris.planner.DataSink; import org.apache.doris.planner.ExportSink; import org.apache.doris.planner.GroupCommitBlockSink; import org.apache.doris.planner.GroupCommitOlapTableSink; +import org.apache.doris.planner.GroupCommitPlanner; import org.apache.doris.planner.OlapTableSink; -import org.apache.doris.planner.StreamLoadPlanner; import org.apache.doris.planner.external.jdbc.JdbcTableSink; -import org.apache.doris.proto.InternalService; import org.apache.doris.qe.ConnectContext; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.service.FrontendOptions; import org.apache.doris.tablefunction.GroupCommitTableValuedFunction; -import org.apache.doris.task.StreamLoadTask; -import org.apache.doris.thrift.TExecPlanFragmentParams; -import org.apache.doris.thrift.TExecPlanFragmentParamsList; -import org.apache.doris.thrift.TFileCompressType; -import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileType; -import org.apache.doris.thrift.TMergeType; import org.apache.doris.thrift.TQueryOptions; -import org.apache.doris.thrift.TScanRangeParams; -import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; @@ -84,10 +74,8 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; import java.util.ArrayList; -import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -167,6 +155,7 @@ public class NativeInsertStmt extends InsertStmt { // true if be generates an insert from group commit tvf stmt and executes to load data public boolean isGroupCommitTvf = false; public boolean isGroupCommitStreamLoadSql = false; + private GroupCommitPlanner groupCommitPlanner; private boolean isFromDeleteOrUpdateStmt = false; @@ -1134,10 +1123,10 @@ public class NativeInsertStmt extends InsertStmt { return isGroupCommit; } - public void planForGroupCommit(TUniqueId queryId) throws UserException, TException { + public GroupCommitPlanner planForGroupCommit(TUniqueId queryId) throws UserException, TException { OlapTable olapTable = (OlapTable) getTargetTable(); if (execPlanFragmentParamsBytes != null && olapTable.getBaseSchemaVersion() == baseSchemaVersion) { - return; + return groupCommitPlanner; } if (!targetColumns.isEmpty()) { Analyzer analyzerTmp = analyzer; @@ -1145,45 +1134,11 @@ public class NativeInsertStmt extends InsertStmt { this.analyzer = analyzerTmp; } analyzeSubquery(analyzer, true); - TStreamLoadPutRequest streamLoadPutRequest = new TStreamLoadPutRequest(); - if (targetColumnNames != null) { - streamLoadPutRequest.setColumns(String.join(",", targetColumnNames)); - if (targetColumnNames.stream().anyMatch(col -> col.equalsIgnoreCase(Column.SEQUENCE_COL))) { - streamLoadPutRequest.setSequenceCol(Column.SEQUENCE_COL); - } - } - streamLoadPutRequest.setDb(db.getFullName()).setMaxFilterRatio(1) - .setTbl(getTbl()) - .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN) - .setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId) - .setGroupCommit(true); - StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest); - StreamLoadPlanner planner = new StreamLoadPlanner((Database) getDbObj(), olapTable, streamLoadTask); - // Will using load id as query id in fragment - TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId()); - for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.params.per_node_scan_ranges.entrySet()) { - for (TScanRangeParams scanRangeParams : entry.getValue()) { - scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType( - TFileFormatType.FORMAT_PROTO); - scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType( - TFileCompressType.PLAIN); - } - } - List<TScanRangeParams> scanRangeParams = tRequest.params.per_node_scan_ranges.values().stream() - .flatMap(Collection::stream).collect(Collectors.toList()); - Preconditions.checkState(scanRangeParams.size() == 1); + groupCommitPlanner = new GroupCommitPlanner((Database) db, olapTable, targetColumnNames, queryId); // save plan message to be reused for prepare stmt loadId = queryId; baseSchemaVersion = olapTable.getBaseSchemaVersion(); - // see BackendServiceProxy#execPlanFragmentsAsync - TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList(); - paramsList.addToParamsList(tRequest); - execPlanFragmentParamsBytes = ByteString.copyFrom(new TSerializer().serialize(paramsList)); - } - - public InternalService.PExecPlanFragmentRequest getExecPlanFragmentRequest() { - return InternalService.PExecPlanFragmentRequest.newBuilder().setRequest(execPlanFragmentParamsBytes) - .setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build(); + return groupCommitPlanner; } public TUniqueId getLoadId() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java index fe6bc13b6c4..802ddaa6c0e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java @@ -28,7 +28,6 @@ import org.apache.doris.nereids.rules.analysis.CheckAfterRewrite; import org.apache.doris.nereids.rules.analysis.EliminateGroupByConstant; import org.apache.doris.nereids.rules.analysis.LogicalSubQueryAliasToLogicalProject; import org.apache.doris.nereids.rules.analysis.NormalizeAggregate; -import org.apache.doris.nereids.rules.analysis.RejectGroupCommitInsert; import org.apache.doris.nereids.rules.expression.CheckLegalityAfterRewrite; import org.apache.doris.nereids.rules.expression.ExpressionNormalization; import org.apache.doris.nereids.rules.expression.ExpressionOptimization; @@ -269,9 +268,6 @@ public class Rewriter extends AbstractBatchJobExecutor { topDown(new BuildAggForUnion()) ), - // TODO remove it after Nereids support group commit insert - topDown(new RejectGroupCommitInsert()), - // topic("Distinct", // costBased(custom(RuleType.PUSH_DOWN_DISTINCT_THROUGH_JOIN, PushdownDistinctThroughJoin::new)) // ), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 505c7db91a7..94ccef528fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -77,8 +77,6 @@ public enum RuleType { ANALYZE_CTE(RuleTypeClass.REWRITE), RELATION_AUTHENTICATION(RuleTypeClass.VALIDATION), - REJECT_GROUP_COMMIT_INSERT(RuleTypeClass.REWRITE), - ADJUST_NULLABLE_FOR_PROJECT_SLOT(RuleTypeClass.REWRITE), ADJUST_NULLABLE_FOR_AGGREGATE_SLOT(RuleTypeClass.REWRITE), ADJUST_NULLABLE_FOR_HAVING_SLOT(RuleTypeClass.REWRITE), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/RejectGroupCommitInsert.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/RejectGroupCommitInsert.java deleted file mode 100644 index 19c9706e03d..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/RejectGroupCommitInsert.java +++ /dev/null @@ -1,53 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.rules.analysis; - -import org.apache.doris.nereids.exceptions.AnalysisException; -import org.apache.doris.nereids.rules.Rule; -import org.apache.doris.nereids.rules.RuleType; -import org.apache.doris.nereids.rules.rewrite.RewriteRuleFactory; - -import com.google.common.collect.ImmutableList; - -import java.util.List; - -/** - * reject group commit insert added by PR <a href="https://github.com/apache/doris/pull/22829/files">#22829</a> - */ -public class RejectGroupCommitInsert implements RewriteRuleFactory { - - @Override - public List<Rule> buildRules() { - return ImmutableList.of( - logicalOlapTableSink(logicalOneRowRelation()) - .thenApply(ctx -> { - if (ctx.connectContext.getSessionVariable().enableInsertGroupCommit) { - throw new AnalysisException("Nereids do not support group commit now."); - } - return null; - }).toRule(RuleType.REJECT_GROUP_COMMIT_INSERT), - logicalOlapTableSink(logicalUnion().when(u -> u.arity() == 0)) - .thenApply(ctx -> { - if (ctx.connectContext.getSessionVariable().enableInsertGroupCommit) { - throw new AnalysisException("Nereids do not support group commit now."); - } - return null; - }).toRule(RuleType.REJECT_GROUP_COMMIT_INSERT) - ); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java index f7c30f3820b..79dc9485fc8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java @@ -22,9 +22,11 @@ import org.apache.doris.analysis.AlterClause; import org.apache.doris.analysis.AlterTableStmt; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.DropPartitionClause; +import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.PartitionNames; import org.apache.doris.analysis.ReplacePartitionClause; import org.apache.doris.analysis.TableName; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.DdlException; @@ -39,6 +41,7 @@ import org.apache.doris.nereids.analyzer.UnboundOlapTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.trees.TreeNode; +import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Explainable; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; @@ -46,12 +49,19 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.txn.Transaction; +import org.apache.doris.planner.GroupCommitPlanner; import org.apache.doris.planner.OlapTableSink; +import org.apache.doris.planner.UnionNode; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.DdlExecutor; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -59,6 +69,7 @@ import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; import java.util.ArrayList; import java.util.HashMap; @@ -68,6 +79,8 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; /** * insert into select command implementation @@ -152,7 +165,13 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, } OlapTableSink sink = ((OlapTableSink) planner.getFragments().get(0).getSink()); - + if (ctx.getSessionVariable().enableInsertGroupCommit) { + // group commit + if (analyzeGroupCommit(sink, physicalOlapTableSink)) { + handleGroupCommit(ctx, sink, physicalOlapTableSink); + return; + } + } Preconditions.checkArgument(!isTxnBegin, "an insert command cannot create more than one txn"); Transaction txn = new Transaction(ctx, physicalOlapTableSink.getDatabase(), @@ -352,6 +371,63 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, } } + private void handleGroupCommit(ConnectContext ctx, OlapTableSink sink, + PhysicalOlapTableSink<?> physicalOlapTableSink) + throws UserException, RpcException, TException, ExecutionException, InterruptedException { + + List<InternalService.PDataRow> rows = new ArrayList<>(); + List<List<Expr>> materializedConstExprLists = ((UnionNode) sink.getFragment().getPlanRoot()) + .getMaterializedConstExprLists(); + + int filterSize = 0; + for (Slot slot : physicalOlapTableSink.getOutput()) { + if (slot.getName().contains(Column.DELETE_SIGN) + || slot.getName().contains(Column.VERSION_COL)) { + filterSize += 1; + } + } + for (List<Expr> list : materializedConstExprLists) { + rows.add(GroupCommitPlanner.getRowStringValue(list, filterSize)); + } + GroupCommitPlanner groupCommitPlanner = new GroupCommitPlanner(physicalOlapTableSink.getDatabase(), + physicalOlapTableSink.getTargetTable(), null, ctx.queryId()); + Future<PGroupCommitInsertResponse> future = groupCommitPlanner.executeGroupCommitInsert(ctx, rows); + PGroupCommitInsertResponse response = future.get(); + TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); + if (code == TStatusCode.DATA_QUALITY_ERROR) { + LOG.info("group commit insert failed. query id: {}, backend id: {}, status: {}, " + + "schema version: {}", ctx.queryId(), + groupCommitPlanner.getBackend(), response.getStatus(), + physicalOlapTableSink.getTargetTable().getBaseSchemaVersion()); + } else if (code != TStatusCode.OK) { + String errMsg = "group commit insert failed. backend id: " + + groupCommitPlanner.getBackend().getId() + ", status: " + + response.getStatus(); + ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); + } + TransactionStatus txnStatus = TransactionStatus.PREPARE; + StringBuilder sb = new StringBuilder(); + sb.append("{'label':'").append(response.getLabel()).append("', 'status':'").append(txnStatus.name()); + sb.append("', 'txnId':'").append(response.getTxnId()).append("'"); + sb.append("', 'optimizer':'").append("nereids").append("'"); + sb.append("}"); + + ctx.getState().setOk(response.getLoadedRows(), (int) response.getFilteredRows(), sb.toString()); + ctx.setOrUpdateInsertResult(response.getTxnId(), response.getLabel(), + physicalOlapTableSink.getDatabase().getFullName(), physicalOlapTableSink.getTargetTable().getName(), + txnStatus, response.getLoadedRows(), (int) response.getFilteredRows()); + // update it, so that user can get loaded rows in fe.audit.log + ctx.updateReturnRows((int) response.getLoadedRows()); + } + + private boolean analyzeGroupCommit(OlapTableSink sink, PhysicalOlapTableSink<?> physicalOlapTableSink) { + return ConnectContext.get().getSessionVariable().enableInsertGroupCommit + && physicalOlapTableSink.getTargetTable() instanceof OlapTable + && !ConnectContext.get().isTxnModel() + && sink.getFragment().getPlanRoot() instanceof UnionNode + && physicalOlapTableSink.getPartitionIds().isEmpty(); + } + @Override public Plan getExplainPlan(ConnectContext ctx) { return this.logicalQuery; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java new file mode 100644 index 00000000000..15aa639abee --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + + +import org.apache.doris.analysis.CastExpr; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.NullLiteral; +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest; +import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse; +import org.apache.doris.proto.Types; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.Backend; +import org.apache.doris.task.StreamLoadTask; +import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TExecPlanFragmentParamsList; +import org.apache.doris.thrift.TFileCompressType; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TMergeType; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TScanRangeParams; +import org.apache.doris.thrift.TStreamLoadPutRequest; +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +// Used to generate a plan fragment for a group commit +// we only support OlapTable now. +public class GroupCommitPlanner { + private static final Logger LOG = LogManager.getLogger(GroupCommitPlanner.class); + + private Database db; + private OlapTable table; + private TUniqueId loadId; + private Backend backend; + private TExecPlanFragmentParamsList paramsList; + private ByteString execPlanFragmentParamsBytes; + + public GroupCommitPlanner(Database db, OlapTable table, List<String> targetColumnNames, TUniqueId queryId) + throws UserException, TException { + this.db = db; + this.table = table; + TStreamLoadPutRequest streamLoadPutRequest = new TStreamLoadPutRequest(); + if (targetColumnNames != null) { + streamLoadPutRequest.setColumns(String.join(",", targetColumnNames)); + if (targetColumnNames.stream().anyMatch(col -> col.equalsIgnoreCase(Column.SEQUENCE_COL))) { + streamLoadPutRequest.setSequenceCol(Column.SEQUENCE_COL); + } + } + streamLoadPutRequest + .setDb(db.getFullName()) + .setMaxFilterRatio(1) + .setTbl(table.getName()) + .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN) + .setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId) + .setGroupCommit(true); + StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest); + StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask); + // Will using load id as query id in fragment + TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId()); + for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.params.per_node_scan_ranges.entrySet()) { + for (TScanRangeParams scanRangeParams : entry.getValue()) { + scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType( + TFileFormatType.FORMAT_PROTO); + scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType( + TFileCompressType.PLAIN); + } + } + List<TScanRangeParams> scanRangeParams = tRequest.params.per_node_scan_ranges.values().stream() + .flatMap(Collection::stream).collect(Collectors.toList()); + Preconditions.checkState(scanRangeParams.size() == 1); + loadId = queryId; + // see BackendServiceProxy#execPlanFragmentsAsync + paramsList = new TExecPlanFragmentParamsList(); + paramsList.addToParamsList(tRequest); + execPlanFragmentParamsBytes = ByteString.copyFrom(new TSerializer().serialize(paramsList)); + } + + public Future<PGroupCommitInsertResponse> executeGroupCommitInsert(ConnectContext ctx, + List<InternalService.PDataRow> rows) throws TException, DdlException, RpcException { + backend = ctx.getInsertGroupCommit(this.table.getId()); + if (backend == null || !backend.isAlive()) { + List<Long> allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); + if (allBackendIds.isEmpty()) { + throw new DdlException("No alive backend"); + } + Collections.shuffle(allBackendIds); + backend = Env.getCurrentSystemInfo().getBackend(allBackendIds.get(0)); + ctx.setInsertGroupCommit(this.table.getId(), backend); + } + PGroupCommitInsertRequest request = PGroupCommitInsertRequest.newBuilder() + .setDbId(db.getId()) + .setTableId(table.getId()) + .setBaseSchemaVersion(table.getBaseSchemaVersion()) + .setExecPlanFragmentRequest(InternalService.PExecPlanFragmentRequest.newBuilder() + .setRequest(execPlanFragmentParamsBytes) + .setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build()) + .setLoadId(Types.PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo) + .build()).addAllData(rows) + .build(); + Future<PGroupCommitInsertResponse> future = BackendServiceProxy.getInstance() + .groupCommitInsert(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request); + return future; + } + + // only for nereids use + public static InternalService.PDataRow getRowStringValue(List<Expr> cols, int filterSize) throws UserException { + if (cols.isEmpty()) { + return null; + } + InternalService.PDataRow.Builder row = InternalService.PDataRow.newBuilder(); + try { + List<Expr> exprs = cols.subList(0, cols.size() - filterSize); + for (Expr expr : exprs) { + if (!expr.isLiteralOrCastExpr() && !(expr instanceof CastExpr)) { + if (expr.getChildren().get(0) instanceof NullLiteral) { + row.addColBuilder().setValue("\\N"); + continue; + } + throw new UserException( + "do not support non-literal expr in transactional insert operation: " + expr.toSql()); + } + if (expr instanceof NullLiteral) { + row.addColBuilder().setValue("\\N"); + } else if (expr.getType() instanceof ArrayType) { + row.addColBuilder().setValue(expr.getStringValueForArray()); + } else if (!expr.getChildren().isEmpty()) { + expr.getChildren().forEach(child -> processExprVal(child, row)); + } else { + row.addColBuilder().setValue(expr.getStringValue()); + } + } + } catch (UserException e) { + throw new RuntimeException(e); + } + return row.build(); + } + + private static void processExprVal(Expr expr, InternalService.PDataRow.Builder row) { + if (expr.getChildren().isEmpty()) { + row.addColBuilder().setValue(expr.getStringValue()); + return; + } + for (Expr child : expr.getChildren()) { + processExprVal(child, row); + } + } + + public Backend getBackend() { + return backend; + } + + public TExecPlanFragmentParamsList getParamsList() { + return paramsList; + } + +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 5656ce9d4ff..5fa8a8205cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -126,13 +126,13 @@ import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand; import org.apache.doris.nereids.trees.plans.commands.Forward; import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.planner.GroupCommitPlanner; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.OriginalPlanner; import org.apache.doris.planner.Planner; import org.apache.doris.planner.ScanNode; import org.apache.doris.proto.Data; import org.apache.doris.proto.InternalService; -import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest; import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse; import org.apache.doris.proto.Types; import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData; @@ -145,19 +145,16 @@ import org.apache.doris.resource.workloadgroup.QueryQueue; import org.apache.doris.resource.workloadgroup.QueueOfferToken; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; -import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.util.InternalQueryBuffer; -import org.apache.doris.system.Backend; import org.apache.doris.task.LoadEtlTask; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TLoadTxnBeginRequest; import org.apache.doris.thrift.TLoadTxnBeginResult; import org.apache.doris.thrift.TMergeType; -import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TResultBatch; @@ -1828,19 +1825,9 @@ public class StmtExecutor { txnId = context.getTxnEntry().getTxnConf().getTxnId(); } else if (insertStmt instanceof NativeInsertStmt && ((NativeInsertStmt) insertStmt).isGroupCommit()) { NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) insertStmt; - Backend backend = context.getInsertGroupCommit(insertStmt.getTargetTable().getId()); - if (backend == null || !backend.isAlive()) { - List<Long> allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); - if (allBackendIds.isEmpty()) { - throw new DdlException("No alive backend"); - } - Collections.shuffle(allBackendIds); - backend = Env.getCurrentSystemInfo().getBackend(allBackendIds.get(0)); - context.setInsertGroupCommit(insertStmt.getTargetTable().getId(), backend); - } int maxRetry = 3; for (int i = 0; i < maxRetry; i++) { - nativeInsertStmt.planForGroupCommit(context.queryId); + GroupCommitPlanner groupCommitPlanner = nativeInsertStmt.planForGroupCommit(context.queryId); // handle rows List<InternalService.PDataRow> rows = new ArrayList<>(); SelectStmt selectStmt = (SelectStmt) insertStmt.getQueryStmt(); @@ -1861,23 +1848,15 @@ public class StmtExecutor { InternalService.PDataRow data = getRowStringValue(exprList); rows.add(data); } - TUniqueId loadId = nativeInsertStmt.getLoadId(); - PGroupCommitInsertRequest request = PGroupCommitInsertRequest.newBuilder() - .setDbId(insertStmt.getTargetTable().getDatabase().getId()) - .setTableId(insertStmt.getTargetTable().getId()) - .setBaseSchemaVersion(nativeInsertStmt.getBaseSchemaVersion()) - .setExecPlanFragmentRequest(nativeInsertStmt.getExecPlanFragmentRequest()) - .setLoadId(Types.PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo) - .build()).addAllData(rows) - .build(); - Future<PGroupCommitInsertResponse> future = BackendServiceProxy.getInstance() - .groupCommitInsert(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request); + Future<PGroupCommitInsertResponse> future = groupCommitPlanner + .executeGroupCommitInsert(context, rows); PGroupCommitInsertResponse response = future.get(); TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); if (code == TStatusCode.DATA_QUALITY_ERROR) { LOG.info("group commit insert failed. stmt: {}, backend id: {}, status: {}, " + "schema version: {}, retry: {}", insertStmt.getOrigStmt().originStmt, - backend.getId(), response.getStatus(), nativeInsertStmt.getBaseSchemaVersion(), i); + groupCommitPlanner.getBackend().getId(), + response.getStatus(), nativeInsertStmt.getBaseSchemaVersion(), i); if (i < maxRetry) { List<TableIf> tables = Lists.newArrayList(insertStmt.getTargetTable()); MetaLockUtils.readLockTables(tables); @@ -1890,12 +1869,12 @@ public class StmtExecutor { } continue; } else { - errMsg = "group commit insert failed. backend id: " + backend.getId() + ", status: " - + response.getStatus(); + errMsg = "group commit insert failed. backend id: " + + groupCommitPlanner.getBackend().getId() + ", status: " + response.getStatus(); } } else if (code != TStatusCode.OK) { - errMsg = "group commit insert failed. backend id: " + backend.getId() + ", status: " - + response.getStatus(); + errMsg = "group commit insert failed. backend id: " + groupCommitPlanner.getBackend().getId() + + ", status: " + response.getStatus(); ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); } label = response.getLabel(); diff --git a/regression-test/data/insert_p0/insert_group_commit_into.out b/regression-test/data/insert_p0/insert_group_commit_into.out index 97fc1897552..dfb3a3b41c0 100644 --- a/regression-test/data/insert_p0/insert_group_commit_into.out +++ b/regression-test/data/insert_p0/insert_group_commit_into.out @@ -94,3 +94,98 @@ q 50 -- !sql -- 0 service_46da0dab-e27d-4820-aea2-9bfc15741615 1697032066304 0 3229b7cd-f3a2-4359-aa24-946388c9cc54 0 CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVm [...] +-- !sql -- +1 a 10 +2 b -1 +3 c -1 +4 \N -1 +5 q 50 +6 \N -1 + +-- !sql -- +1 a 10 +1 a 10 +2 b -1 +2 b -1 +3 c -1 +3 c -1 +4 e1 -1 +5 q 50 +5 q 50 +6 \N -1 +6 \N -1 + +-- !sql -- +1 a \N 10 +1 a \N 10 +1 a \N 10 +2 b \N -1 +2 b \N -1 +2 b \N -1 +3 c \N -1 +3 c \N -1 +3 c \N -1 +4 \N \N -1 +4 e1 \N -1 +5 q \N 50 +5 q \N 50 +5 q \N 50 +6 \N \N -1 +6 \N \N -1 +6 \N \N -1 + +-- !sql -- +2 b \N -1 +6 \N \N -1 + +-- !sql -- +1 a 10 5 +2 b -1 \N +2 b -1 \N +3 c -1 \N +4 \N -1 \N +5 q 50 6 +6 \N -1 \N +6 \N -1 \N + +-- !sql -- +1 a 10 +1 a 10 +2 b -1 +2 b -1 +2 b -1 +3 c -1 +3 c -1 +4 \N -1 +4 \N -1 +5 q 50 +5 q 50 +6 \N -1 +6 \N -1 +6 \N -1 + +-- !sql -- +\N -1 +\N -1 +\N -1 +\N -1 +\N -1 +\N -1 +\N -1 +a 10 +a 10 +a 10 +b -1 +b -1 +b -1 +b -1 +c -1 +c -1 +c -1 +q 50 +q 50 +q 50 + +-- !sql -- +0 service_46da0dab-e27d-4820-aea2-9bfc15741615 1697032066304 0 3229b7cd-f3a2-4359-aa24-946388c9cc54 0 CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVm [...] + diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy b/regression-test/suites/insert_p0/insert_group_commit_into.groovy index 26cc9ca9776..bea130d1067 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy @@ -77,173 +77,193 @@ suite("insert_group_commit_into") { assertTrue(serverInfo.contains("'status':'VISIBLE'")) assertTrue(!serverInfo.contains("'label':'group_commit_")) } + for (item in ["legacy", "nereids"]) { + try { + // create table + sql """ drop table if exists ${table}; """ - try { - // create table - sql """ drop table if exists ${table}; """ - - sql """ - CREATE TABLE ${table} ( - `id` int(11) NOT NULL, - `name` varchar(50) NULL, - `score` int(11) NULL default "-1" - ) ENGINE=OLAP - DUPLICATE KEY(`id`, `name`) - PARTITION BY RANGE(id) - ( - FROM (1) TO (100) INTERVAL 10 - ) - DISTRIBUTED BY HASH(`id`) BUCKETS 1 - PROPERTIES ( - "replication_num" = "1" - ); - """ - - connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { - sql """ set enable_insert_group_commit = true; """ - // TODO - sql """ set enable_nereids_dml = false; """ - - // 1. insert into - group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 - group_commit_insert """ insert into ${table}(id) values(4); """, 1 - group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """, 2 - group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 - group_commit_insert """ insert into ${table}(id) select 6; """, 1 - - getRowCount(6) - qt_sql """ select * from ${table} order by id, name, score asc; """ - - // 2. insert into and delete - sql """ delete from ${table} where id = 4; """ - group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 - /*sql """ insert into ${table}(id, name) values(4, 'd1'); """ - sql """ insert into ${table}(id, name) values(4, 'd1'); """ - sql """ delete from ${table} where id = 4; """*/ - group_commit_insert """ insert into ${table}(id, name) values(4, 'e1'); """, 1 - group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """, 2 - group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 - group_commit_insert """ insert into ${table}(id) select 6; """, 1 - - getRowCount(11) - qt_sql """ select * from ${table} order by id, name, score asc; """ - - // 3. insert into and light schema change: add column - group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 - group_commit_insert """ insert into ${table}(id) values(4); """, 1 - group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """, 2 - sql """ alter table ${table} ADD column age int after name; """ - group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 - group_commit_insert """ insert into ${table}(id) select 6; """, 1 - - assertTrue(getAlterTableState(), "add column should success") - getRowCount(17) - qt_sql """ select * from ${table} order by id, name,score asc; """ - - // 4. insert into and truncate table - /*sql """ insert into ${table}(name, id) values('c', 3); """ - sql """ insert into ${table}(id) values(4); """ - sql """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """*/ - sql """ truncate table ${table}; """ - group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 - group_commit_insert """ insert into ${table}(id) select 6; """, 1 - - getRowCount(2) - qt_sql """ select * from ${table} order by id, name, score asc; """ - - // 5. insert into and schema change: modify column order - group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 - group_commit_insert """ insert into ${table}(id) values(4); """, 1 - group_commit_insert """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2 - // sql """ alter table ${table} order by (id, name, score, age); """ - group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 - group_commit_insert """ insert into ${table}(id) select 6; """, 1 - - // assertTrue(getAlterTableState(), "modify column order should success") - getRowCount(8) - qt_sql """ select id, name, score, age from ${table} order by id, name, score asc; """ - - // 6. insert into and light schema change: drop column - group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 - group_commit_insert """ insert into ${table}(id) values(4); """, 1 - group_commit_insert """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2 - sql """ alter table ${table} DROP column age; """ - group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 - group_commit_insert """ insert into ${table}(id) select 6; """, 1 - - assertTrue(getAlterTableState(), "drop column should success") - getRowCount(14) - qt_sql """ select * from ${table} order by id, name, score asc; """ - - // 7. insert into and add rollup - group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 - group_commit_insert """ insert into ${table}(id) values(4); """, 1 - group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50),(101, 'a', 100); """, 2 - // sql """ alter table ${table} ADD ROLLUP r1(name, score); """ - group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 - group_commit_insert """ insert into ${table}(id) select 6; """, 1 - - getRowCount(20) - qt_sql """ select name, score from ${table} order by name asc; """ - - none_group_commit_insert """ insert into ${table}(id, name, score) values(10 + 1, 'h', 100); """, 1 - none_group_commit_insert """ insert into ${table}(id, name, score) select 10 + 2, 'h', 100; """, 1 - none_group_commit_insert """ insert into ${table} with label test_gc_""" + System.currentTimeMillis() + """ (id, name, score) values(13, 'h', 100); """, 1 - def rowCount = sql "select count(*) from ${table}" - logger.info("row count: " + rowCount) - assertEquals(rowCount[0][0], 23) + sql """ + CREATE TABLE ${table} ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`, `name`) + PARTITION BY RANGE(id) + ( + FROM (1) TO (100) INTERVAL 10 + ) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql """ set enable_insert_group_commit = true; """ + if (item == "nereids") { + sql """ set enable_nereids_dml = true; """ + sql """ set enable_nereids_planner=true; """ + sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_dml = false; """ + } + + // 1. insert into + group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 + group_commit_insert """ insert into ${table}(id) values(4); """, 1 + group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """, 2 + group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 + group_commit_insert """ insert into ${table}(id) select 6; """, 1 + + getRowCount(6) + qt_sql """ select * from ${table} order by id, name, score asc; """ + + // 2. insert into and delete + sql """ delete from ${table} where id = 4; """ + group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 + /*sql """ insert into ${table}(id, name) values(4, 'd1'); """ + sql """ insert into ${table}(id, name) values(4, 'd1'); """ + sql """ delete from ${table} where id = 4; """*/ + group_commit_insert """ insert into ${table}(id, name) values(4, 'e1'); """, 1 + group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """, 2 + group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 + group_commit_insert """ insert into ${table}(id) select 6; """, 1 + + getRowCount(11) + qt_sql """ select * from ${table} order by id, name, score asc; """ + + // 3. insert into and light schema change: add column + group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 + group_commit_insert """ insert into ${table}(id) values(4); """, 1 + group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """, 2 + sql """ alter table ${table} ADD column age int after name; """ + group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 + group_commit_insert """ insert into ${table}(id) select 6; """, 1 + + assertTrue(getAlterTableState(), "add column should success") + getRowCount(17) + qt_sql """ select * from ${table} order by id, name,score asc; """ + + // 4. insert into and truncate table + /*sql """ insert into ${table}(name, id) values('c', 3); """ + sql """ insert into ${table}(id) values(4); """ + sql """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """*/ + sql """ truncate table ${table}; """ + group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 + group_commit_insert """ insert into ${table}(id) select 6; """, 1 + + getRowCount(2) + qt_sql """ select * from ${table} order by id, name, score asc; """ + + // 5. insert into and schema change: modify column order + group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 + group_commit_insert """ insert into ${table}(id) values(4); """, 1 + group_commit_insert """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2 + // sql """ alter table ${table} order by (id, name, score, age); """ + group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 + group_commit_insert """ insert into ${table}(id) select 6; """, 1 + + // assertTrue(getAlterTableState(), "modify column order should success") + getRowCount(8) + qt_sql """ select id, name, score, age from ${table} order by id, name, score asc; """ + + // 6. insert into and light schema change: drop column + group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 + group_commit_insert """ insert into ${table}(id) values(4); """, 1 + group_commit_insert """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2 + sql """ alter table ${table} DROP column age; """ + group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 + group_commit_insert """ insert into ${table}(id) select 6; """, 1 + + assertTrue(getAlterTableState(), "drop column should success") + getRowCount(14) + qt_sql """ select * from ${table} order by id, name, score asc; """ + + // 7. insert into and add rollup + group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 + group_commit_insert """ insert into ${table}(id) values(4); """, 1 + group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50),(101, 'a', 100); """, 2 + // sql """ alter table ${table} ADD ROLLUP r1(name, score); """ + group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 + group_commit_insert """ insert into ${table}(id) select 6; """, 1 + + getRowCount(20) + qt_sql """ select name, score from ${table} order by name asc; """ + + + if (item == "nereids") { + group_commit_insert """ insert into ${table}(id, name, score) values(10 + 1, 'h', 100); """, 1 + group_commit_insert """ insert into ${table}(id, name, score) select 10 + 2, 'h', 100; """, 1 + group_commit_insert """ insert into ${table} with label test_gc_""" + System.currentTimeMillis() + """ (id, name, score) values(13, 'h', 100); """, 1 + getRowCount(23) + } else { + none_group_commit_insert """ insert into ${table}(id, name, score) values(10 + 1, 'h', 100); """, 1 + none_group_commit_insert """ insert into ${table}(id, name, score) select 10 + 2, 'h', 100; """, 1 + none_group_commit_insert """ insert into ${table} with label test_gc_""" + System.currentTimeMillis() + """ (id, name, score) values(13, 'h', 100); """, 1 + } + + def rowCount = sql "select count(*) from ${table}" + logger.info("row count: " + rowCount) + assertEquals(rowCount[0][0], 23) + } + } finally { + // try_sql("DROP TABLE ${table}") } - } finally { - // try_sql("DROP TABLE ${table}") - } - // table with array type - tableName = "insert_group_commit_into_duplicate_array" - table = dbName + "." + tableName - try { - // create table - sql """ drop table if exists ${table}; """ - - sql """ - CREATE table ${table} ( - teamID varchar(255), - service_id varchar(255), - start_time BigInt, - time_bucket BigInt , - segment_id String , - trace_id String , - data_binary String , - end_time BigInt , - endpoint_id String , - endpoint_name String , - is_error Boolean , - latency Int , - service_instance_id String , - statement String , - tags Array<String> - ) UNIQUE key (`teamID`,`service_id`, `start_time`) - DISTRIBUTED BY hash(`start_time`) - BUCKETS 1 - PROPERTIES ("replication_allocation" = "tag.location.default: 1") - """ - - connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { - sql """ set enable_insert_group_commit = true; """ - // TODO - sql """ set enable_nereids_dml = false; """ - - // 1. insert into - group_commit_insert """ - INSERT INTO ${table} (`data_binary`, `end_time`, `endpoint_id`, `endpoint_name`, `is_error`, `latency`, `segment_id`, `service_id`, `service_instance_id`, `start_time`, `statement`, `tags`, `teamID`, `time_bucket`, `trace_id`) - VALUES - ('CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2 [...] - 1697032066304, '36b2d9ff-4c25-49f3-a726-eea812564411', '355f96cd-b1b1-4688-a5f6-a8e3f3a55c9a', false, 3, '3229b7cd-f3a2-4359-aa24-946388c9cc54', 'service_46da0dab-e27d-4820-aea2-9bfc15741615', 'service_instanceac89a4b7-81f7-43e8-85ed-d2b578d98050', 1697032066304, 'statement: b9903670-3821-4f4c-a587-bbcf02c04b77', ['[tagKey_5=tagValue_5, tagKey_3=tagValue_3, tagKey_1=tagValue_1, tagKey_16=tagValue_16, tagKey_8=tagValue_8, tagKey_15=tagValue_15, tagKey_6=tagValue_6, tagKey_11=t [...] - """, 1 - - getRowCount(1) - qt_sql """ select * from ${table}; """ + // table with array type + tableName = "insert_group_commit_into_duplicate_array" + table = dbName + "." + tableName + try { + // create table + sql """ drop table if exists ${table}; """ + + sql """ + CREATE table ${table} ( + teamID varchar(255), + service_id varchar(255), + start_time BigInt, + time_bucket BigInt , + segment_id String , + trace_id String , + data_binary String , + end_time BigInt , + endpoint_id String , + endpoint_name String , + is_error Boolean , + latency Int , + service_instance_id String , + statement String , + tags Array<String> + ) UNIQUE key (`teamID`,`service_id`, `start_time`) + DISTRIBUTED BY hash(`start_time`) + BUCKETS 1 + PROPERTIES ("replication_allocation" = "tag.location.default: 1") + """ + + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql """ set enable_insert_group_commit = true; """ + if (item == "nereids") { + sql """ set enable_nereids_dml = true; """ + sql """ set enable_nereids_planner=true; """ + sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_dml = false; """ + } + + // 1. insert into + group_commit_insert """ + INSERT INTO ${table} (`data_binary`, `end_time`, `endpoint_id`, `endpoint_name`, `is_error`, `latency`, `segment_id`, `service_id`, `service_instance_id`, `start_time`, `statement`, `tags`, `teamID`, `time_bucket`, `trace_id`) + VALUES + ('CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdG [...] + 1697032066304, '36b2d9ff-4c25-49f3-a726-eea812564411', '355f96cd-b1b1-4688-a5f6-a8e3f3a55c9a', false, 3, '3229b7cd-f3a2-4359-aa24-946388c9cc54', 'service_46da0dab-e27d-4820-aea2-9bfc15741615', 'service_instanceac89a4b7-81f7-43e8-85ed-d2b578d98050', 1697032066304, 'statement: b9903670-3821-4f4c-a587-bbcf02c04b77', ['[tagKey_5=tagValue_5, tagKey_3=tagValue_3, tagKey_1=tagValue_1, tagKey_16=tagValue_16, tagKey_8=tagValue_8, tagKey_15=tagValue_15, tagKey_6=tagValue_6, tagKey_ [...] + """, 1 + + getRowCount(1) + qt_sql """ select * from ${table}; """ + } + } finally { + // try_sql("DROP TABLE ${table}") } - } finally { - // try_sql("DROP TABLE ${table}") } } diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy index 3eeaeb797f2..b2c2edb204d 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy @@ -52,231 +52,269 @@ suite("insert_group_commit_with_exception") { } return false } - - try { - // create table - sql """ drop table if exists ${table}; """ - - sql """ - CREATE TABLE `${table}` ( - `id` int(11) NOT NULL, - `name` varchar(1100) NULL, - `score` int(11) NULL default "-1" - ) ENGINE=OLAP - DUPLICATE KEY(`id`, `name`) - DISTRIBUTED BY HASH(`id`) BUCKETS 1 - PROPERTIES ( - "replication_num" = "1" - ); - """ - - sql """ set enable_insert_group_commit = true; """ - // TODO - sql """ set enable_nereids_dml = false; """ - - // insert into without column - try { - def result = sql """ insert into ${table} values(1, 'a', 10, 100) """ - assertTrue(false) - } catch (Exception e) { - assertTrue(e.getMessage().contains("Column count doesn't match value count")) - } - - try { - def result = sql """ insert into ${table} values(2, 'b') """ - assertTrue(false) - } catch (Exception e) { - assertTrue(e.getMessage().contains("Column count doesn't match value count")) - } - - result = sql """ insert into ${table} values(3, 'c', 30) """ - logger.info("insert result: " + result) - - // insert into with column - result = sql """ insert into ${table}(id, name) values(4, 'd') """ - logger.info("insert result: " + result) - - getRowCount(2) - - try { - result = sql """ insert into ${table}(id, name) values(5, 'd', 50) """ - assertTrue(false) - } catch (Exception e) { - assertTrue(e.getMessage().contains("Column count doesn't match value count")) - } - - try { - result = sql """ insert into ${table}(id, name) values(6) """ - assertTrue(false) - } catch (Exception e) { - assertTrue(e.getMessage().contains("Column count doesn't match value count")) - } - + for (item in ["legacy", "nereids"]) { try { - result = sql """ insert into ${table}(id, names) values(7, 'd') """ - assertTrue(false) - } catch (Exception e) { - assertTrue(e.getMessage().contains("Unknown column 'names'")) - } - + // create table + sql """ drop table if exists ${table}; """ + + sql """ + CREATE TABLE `${table}` ( + `id` int(11) NOT NULL, + `name` varchar(1100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ set enable_insert_group_commit = true; """ + if (item == "nereids") { + sql """ set enable_nereids_dml = true; """ + sql """ set enable_nereids_planner=true; """ + sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_dml = false; """ + } - // prepare insert - def db = context.config.defaultDb + "_insert_p0" - String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db) - - try (Connection connection = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword)) { - Statement statement = connection.createStatement(); - statement.execute("use ${db}"); - statement.execute("set enable_insert_group_commit = true;"); - // without column - try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?, ?)")) { - ps.setObject(1, 8); - ps.setObject(2, "f"); - ps.setObject(3, 70); - ps.setObject(4, "a"); - ps.addBatch(); - int[] result = ps.executeBatch(); + // insert into without column + try { + def result = sql """ insert into ${table} values(1, 'a', 10, 100) """ assertTrue(false) } catch (Exception e) { - assertTrue(e.getMessage().contains("Column count doesn't match value count")) + if (item == "nereids") { + assertTrue(e.getMessage().contains("insert into cols should be corresponding to the query output")) + } else { + assertTrue(e.getMessage().contains("Column count doesn't match value count")) + } } - try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?)")) { - ps.setObject(1, 9); - ps.setObject(2, "f"); - ps.addBatch(); - int[] result = ps.executeBatch(); + try { + def result = sql """ insert into ${table} values(2, 'b') """ assertTrue(false) } catch (Exception e) { - assertTrue(e.getMessage().contains("Column count doesn't match value count")) + if (item == "nereids") { + assertTrue(e.getMessage().contains("insert into cols should be corresponding to the query output")) + } else { + assertTrue(e.getMessage().contains("Column count doesn't match value count")) + } } - try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?)")) { - ps.setObject(1, 10); - ps.setObject(2, "f"); - ps.setObject(3, 90); - ps.addBatch(); - int[] result = ps.executeBatch(); - logger.info("prepare insert result: " + result) - } + result = sql """ insert into ${table} values(3, 'c', 30) """ + logger.info("insert result: " + result) - // with columns - try (PreparedStatement ps = connection.prepareStatement("insert into ${table}(id, name) values(?, ?)")) { - ps.setObject(1, 11); - ps.setObject(2, "f"); - ps.addBatch(); - int[] result = ps.executeBatch(); - logger.info("prepare insert result: " + result) - } + // insert into with column + result = sql """ insert into ${table}(id, name) values(4, 'd') """ + logger.info("insert result: " + result) + + getRowCount(2) - try (PreparedStatement ps = connection.prepareStatement("insert into ${table}(id, name) values(?, ?, ?)")) { - ps.setObject(1, 12); - ps.setObject(2, "f"); - ps.setObject(3, "f"); - ps.addBatch(); - int[] result = ps.executeBatch(); + try { + result = sql """ insert into ${table}(id, name) values(5, 'd', 50) """ assertTrue(false) } catch (Exception e) { - assertTrue(e.getMessage().contains("Column count doesn't match value count")) + if (item == "nereids") { + assertTrue(e.getMessage().contains("insert into cols should be corresponding to the query output")) + } else { + assertTrue(e.getMessage().contains("Column count doesn't match value count")) + } } - try (PreparedStatement ps = connection.prepareStatement("insert into ${table}(id, name) values(?)")) { - ps.setObject(1, 13); - ps.addBatch(); - int[] result = ps.executeBatch(); + try { + result = sql """ insert into ${table}(id, name) values(6) """ assertTrue(false) } catch (Exception e) { - assertTrue(e.getMessage().contains("Column count doesn't match value count")) + if (item == "nereids") { + assertTrue(e.getMessage().contains("insert into cols should be corresponding to the query output")) + } else { + assertTrue(e.getMessage().contains("Column count doesn't match value count")) + } } - try (PreparedStatement ps = connection.prepareStatement("insert into ${table}(id, names) values(?, ?)")) { - ps.setObject(1, 12); - ps.setObject(2, "f"); - ps.addBatch(); - int[] result = ps.executeBatch(); + try { + result = sql """ insert into ${table}(id, names) values(7, 'd') """ assertTrue(false) } catch (Exception e) { - assertTrue(e.getMessage().contains("Unknown column 'names'")) + if (item == "nereids") { + assertTrue(e.getMessage().contains("column names is not found in table")) + } else { + assertTrue(e.getMessage().contains("Unknown column 'names'")) + } } - getRowCount(4) - // prepare insert with multi rows - try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?)")) { - for (int i = 0; i < 5; i++) { - ps.setObject(1, 13 + i); + // prepare insert + def db = context.config.defaultDb + "_insert_p0" + String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db) + + if (item == "nereids") { + println("nereids does not support prepare insert"); + continue; + }; + + try (Connection connection = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword)) { + Statement statement = connection.createStatement(); + statement.execute("use ${db}"); + statement.execute("set enable_insert_group_commit = true;"); + if (item == "nereids") { + statement.execute("set enable_nereids_dml = true;"); + statement.execute("set enable_nereids_planner=true;"); + statement.execute("set enable_fallback_to_original_planner=false;"); + } else { + statement.execute("set enable_nereids_dml = false;"); + } + // without column + try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?, ?)")) { + ps.setObject(1, 8); + ps.setObject(2, "f"); + ps.setObject(3, 70); + ps.setObject(4, "a"); + ps.addBatch(); + int[] result = ps.executeBatch(); + assertTrue(false) + } catch (Exception e) { + assertTrue(e.getMessage().contains("Column count doesn't match value count")) + } + + try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?)")) { + ps.setObject(1, 9); + ps.setObject(2, "f"); + ps.addBatch(); + int[] result = ps.executeBatch(); + assertTrue(false) + } catch (Exception e) { + assertTrue(e.getMessage().contains("Column count doesn't match value count")) + } + + try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?)")) { + ps.setObject(1, 10); ps.setObject(2, "f"); ps.setObject(3, 90); ps.addBatch(); int[] result = ps.executeBatch(); logger.info("prepare insert result: " + result) } - } - getRowCount(9) - // prepare insert with multi rows - try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?),(?, ?, ?)")) { - for (int i = 0; i < 2; i++) { - ps.setObject(1, 18 + i); + // with columns + try (PreparedStatement ps = connection.prepareStatement("insert into ${table}(id, name) values(?, ?)")) { + ps.setObject(1, 11); ps.setObject(2, "f"); - ps.setObject(3, 90); - ps.setObject(4, 18 + i + 1); - ps.setObject(5, "f"); - ps.setObject(6, 90); ps.addBatch(); int[] result = ps.executeBatch(); logger.info("prepare insert result: " + result) } - } - getRowCount(13) - - // prepare insert without column names, and do schema change - try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?)")) { - ps.setObject(1, 22) - ps.setObject(2, "f") - ps.setObject(3, 90) - ps.addBatch() - int[] result = ps.executeBatch() - logger.info("prepare insert result: " + result) - - sql """ alter table ${table} ADD column age int after name; """ - assertTrue(getAlterTableState(), "add column should success") - - try { - ps.setObject(1, 23) + + try (PreparedStatement ps = connection.prepareStatement("insert into ${table}(id, name) values(?, ?, ?)")) { + ps.setObject(1, 12); + ps.setObject(2, "f"); + ps.setObject(3, "f"); + ps.addBatch(); + int[] result = ps.executeBatch(); + assertTrue(false) + } catch (Exception e) { + assertTrue(e.getMessage().contains("Column count doesn't match value count")) + } + + try (PreparedStatement ps = connection.prepareStatement("insert into ${table}(id, name) values(?)")) { + ps.setObject(1, 13); + ps.addBatch(); + int[] result = ps.executeBatch(); + assertTrue(false) + } catch (Exception e) { + assertTrue(e.getMessage().contains("Column count doesn't match value count")) + } + + try (PreparedStatement ps = connection.prepareStatement("insert into ${table}(id, names) values(?, ?)")) { + ps.setObject(1, 12); + ps.setObject(2, "f"); + ps.addBatch(); + int[] result = ps.executeBatch(); + assertTrue(false) + } catch (Exception e) { + assertTrue(e.getMessage().contains("Unknown column 'names'")) + } + + getRowCount(4) + + // prepare insert with multi rows + try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?)")) { + for (int i = 0; i < 5; i++) { + ps.setObject(1, 13 + i); + ps.setObject(2, "f"); + ps.setObject(3, 90); + ps.addBatch(); + int[] result = ps.executeBatch(); + logger.info("prepare insert result: " + result) + } + } + getRowCount(9) + + // prepare insert with multi rows + try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?),(?, ?, ?)")) { + for (int i = 0; i < 2; i++) { + ps.setObject(1, 18 + i); + ps.setObject(2, "f"); + ps.setObject(3, 90); + ps.setObject(4, 18 + i + 1); + ps.setObject(5, "f"); + ps.setObject(6, 90); + ps.addBatch(); + int[] result = ps.executeBatch(); + logger.info("prepare insert result: " + result) + } + } + getRowCount(13) + + // prepare insert without column names, and do schema change + try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?)")) { + ps.setObject(1, 22) + ps.setObject(2, "f") + ps.setObject(3, 90) + ps.addBatch() + int[] result = ps.executeBatch() + logger.info("prepare insert result: " + result) + + sql """ alter table ${table} ADD column age int after name; """ + assertTrue(getAlterTableState(), "add column should success") + + try { + ps.setObject(1, 23) + ps.setObject(2, "f") + ps.setObject(3, 90) + ps.addBatch() + result = ps.executeBatch() + assertTrue(false) + } catch (Exception e) { + assertTrue(e.getMessage().contains("Column count doesn't match value count")) + } + } + getRowCount(14) + + // prepare insert with column names, and do schema change + try (PreparedStatement ps = connection.prepareStatement("insert into ${table}(id, name, score) values(?, ?, ?)")) { + ps.setObject(1, 24) + ps.setObject(2, "f") + ps.setObject(3, 90) + ps.addBatch() + int[] result = ps.executeBatch() + logger.info("prepare insert result: " + result) + + sql """ alter table ${table} DROP column age; """ + assertTrue(getAlterTableState(), "drop column should success") + + ps.setObject(1, 25) ps.setObject(2, "f") ps.setObject(3, 90) ps.addBatch() result = ps.executeBatch() - assertTrue(false) - } catch (Exception e) { - assertTrue(e.getMessage().contains("Column count doesn't match value count")) + logger.info("prepare insert result: " + result) } + getRowCount(16) } - getRowCount(14) - - // prepare insert with column names, and do schema change - try (PreparedStatement ps = connection.prepareStatement("insert into ${table}(id, name, score) values(?, ?, ?)")) { - ps.setObject(1, 24) - ps.setObject(2, "f") - ps.setObject(3, 90) - ps.addBatch() - int[] result = ps.executeBatch() - logger.info("prepare insert result: " + result) - - sql """ alter table ${table} DROP column age; """ - assertTrue(getAlterTableState(), "drop column should success") - - ps.setObject(1, 25) - ps.setObject(2, "f") - ps.setObject(3, 90) - ps.addBatch() - result = ps.executeBatch() - logger.info("prepare insert result: " + result) - } - getRowCount(16) + } finally { + // try_sql("DROP TABLE ${table}") } - } finally { - // try_sql("DROP TABLE ${table}") } } diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy index 1805905a2a1..c28b18d3797 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy @@ -48,52 +48,59 @@ suite("insert_group_commit_with_large_data") { assertTrue(serverInfo.contains("'label':'group_commit_")) } - try { - // create table - sql """ drop table if exists ${table}; """ + for (item in ["legacy", "nereids"]) { + try { + // create table + sql """ drop table if exists ${table}; """ - sql """ - CREATE TABLE `${table}` ( - `id` int(11) NOT NULL, - `name` varchar(1100) NULL, - `score` int(11) NULL default "-1" - ) ENGINE=OLAP - DUPLICATE KEY(`id`, `name`) - DISTRIBUTED BY HASH(`id`) BUCKETS 1 - PROPERTIES ( - "replication_num" = "1" - ); - """ + sql """ + CREATE TABLE `${table}` ( + `id` int(11) NOT NULL, + `name` varchar(1100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ - connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { - sql """ set enable_insert_group_commit = true; """ - // TODO - sql """ set enable_nereids_dml = false; """ - sql """ use ${db}; """ + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql """ set enable_insert_group_commit = true; """ + if (item == "nereids") { + sql """ set enable_nereids_dml = true; """ + sql """ set enable_nereids_planner=true; """ + sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_dml = false; """ + } + sql """ use ${db}; """ - // insert into 5000 rows - def insert_sql = """ insert into ${table} values(1, 'a', 10) """ - for (def i in 2..5000) { - insert_sql += """, (${i}, 'a', 10) """ - } - group_commit_insert insert_sql, 5000 - getRowCount(5000) + // insert into 5000 rows + def insert_sql = """ insert into ${table} values(1, 'a', 10) """ + for (def i in 2..5000) { + insert_sql += """, (${i}, 'a', 10) """ + } + group_commit_insert insert_sql, 5000 + getRowCount(5000) - // data size is large than 4MB, need " set global max_allowed_packet = 5508950 " - /*def name_value = "" - for (def i in 0..1024) { - name_value += 'a' - } - insert_sql = """ insert into ${table} values(1, '${name_value}', 10) """ - for (def i in 2..5000) { - insert_sql += """, (${i}, '${name_value}', 10) """ + // data size is large than 4MB, need " set global max_allowed_packet = 5508950 " + /*def name_value = "" + for (def i in 0..1024) { + name_value += 'a' + } + insert_sql = """ insert into ${table} values(1, '${name_value}', 10) """ + for (def i in 2..5000) { + insert_sql += """, (${i}, '${name_value}', 10) """ + } + result = sql """ ${insert_sql} """ + group_commit_insert insert_sql, 5000 + getRowCount(10000) + */ } - result = sql """ ${insert_sql} """ - group_commit_insert insert_sql, 5000 - getRowCount(10000) - */ + } finally { + // try_sql("DROP TABLE ${table}") } - } finally { - // try_sql("DROP TABLE ${table}") } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org