This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ac21d76bc55 [fix](group commit) Fix group commit in nereids (#37002) ac21d76bc55 is described below commit ac21d76bc555ba465d65a067e8bc45be1ac496bc Author: meiyi <myime...@gmail.com> AuthorDate: Thu Jul 4 18:08:52 2024 +0800 [fix](group commit) Fix group commit in nereids (#37002) ## Proposed changes Before: the insert into values in group commit use the stream load way to load data, this may cause complex data type load error. After: use the original insert into values way(scanner node is union node), sink is group_commit_block_sink. --- .../exec/group_commit_block_sink_operator.cpp | 6 +- be/src/runtime/fragment_mgr.cpp | 4 + be/src/runtime/group_commit_mgr.cpp | 8 +- be/src/runtime/group_commit_mgr.h | 6 +- .../commands/insert/GroupCommitInsertExecutor.java | 256 --------------------- .../commands/insert/InsertIntoTableCommand.java | 22 +- .../insert/OlapGroupCommitInsertExecutor.java | 98 ++++++++ .../plans/commands/insert/OlapInsertExecutor.java | 9 +- .../commands/insert/OlapTxnInsertExecutor.java | 6 +- .../main/java/org/apache/doris/qe/Coordinator.java | 17 ++ gensrc/thrift/FrontendService.thrift | 2 + .../insert_p0/insert_group_commit_into.groovy | 27 +-- ...nsert_group_commit_into_max_filter_ratio.groovy | 14 +- .../insert_group_commit_into_unique.groovy | 6 +- ...nsert_group_commit_into_unique_sync_mode.groovy | 6 +- .../insert_p0/test_group_commit_timeout.groovy | 2 +- 16 files changed, 182 insertions(+), 307 deletions(-) diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp index 402354d6f24..17088b37c3e 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp @@ -63,10 +63,14 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* state) { Status GroupCommitBlockSinkLocalState::_initialize_load_queue() { auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>(); if (_state->exec_env()->wal_mgr()->is_running()) { + std::string label; + int64_t txn_id; RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue( p._db_id, p._table_id, p._base_schema_version, p._load_id, _load_block_queue, _state->be_exec_version(), _state->query_mem_tracker(), _create_plan_dependency, - _put_block_dependency)); + _put_block_dependency, label, txn_id)); + _state->set_import_label(label); + _state->set_wal_id(txn_id); // wal_id is txn_id return Status::OK(); } else { return Status::InternalError("be is stopping"); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index aa23f97f9fd..76048373286 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -338,6 +338,10 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { params.__set_tracking_url( to_load_error_http_path(rs->get_error_log_file_path())); } + if (rs->wal_id() > 0) { + params.__set_txn_id(rs->wal_id()); + params.__set_label(rs->import_label()); + } } } if (!req.runtime_state->export_output_files().empty()) { diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 8a81c942fd3..464f9f51221 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -257,7 +257,7 @@ Status GroupCommitTable::get_first_block_load_queue( std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<pipeline::Dependency> create_plan_dep, - std::shared_ptr<pipeline::Dependency> put_block_dep) { + std::shared_ptr<pipeline::Dependency> put_block_dep, std::string& label, int64_t& txn_id) { DCHECK(table_id == _table_id); std::unique_lock l(_lock); auto try_to_get_matched_queue = [&]() -> Status { @@ -266,6 +266,8 @@ Status GroupCommitTable::get_first_block_load_queue( if (base_schema_version == inner_block_queue->schema_version) { if (inner_block_queue->add_load_id(load_id, put_block_dep).ok()) { load_block_queue = inner_block_queue; + label = inner_block_queue->label; + txn_id = inner_block_queue->txn_id; return Status::OK(); } } else { @@ -561,7 +563,7 @@ Status GroupCommitMgr::get_first_block_load_queue( std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<pipeline::Dependency> create_plan_dep, - std::shared_ptr<pipeline::Dependency> put_block_dep) { + std::shared_ptr<pipeline::Dependency> put_block_dep, std::string& label, int64_t& txn_id) { std::shared_ptr<GroupCommitTable> group_commit_table; { std::lock_guard wlock(_lock); @@ -574,7 +576,7 @@ Status GroupCommitMgr::get_first_block_load_queue( } RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue( table_id, base_schema_version, load_id, load_block_queue, be_exe_version, mem_tracker, - create_plan_dep, put_block_dep)); + create_plan_dep, put_block_dep, label, txn_id)); return Status::OK(); } diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 36c51746ef4..e9ea152ea5c 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -159,7 +159,8 @@ public: int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<pipeline::Dependency> create_plan_dep, - std::shared_ptr<pipeline::Dependency> put_block_dep); + std::shared_ptr<pipeline::Dependency> put_block_dep, + std::string& label, int64_t& txn_id); Status get_load_block_queue(const TUniqueId& instance_id, std::shared_ptr<LoadBlockQueue>& load_block_queue, std::shared_ptr<pipeline::Dependency> get_block_dep); @@ -205,7 +206,8 @@ public: int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<pipeline::Dependency> create_plan_dep, - std::shared_ptr<pipeline::Dependency> put_block_dep); + std::shared_ptr<pipeline::Dependency> put_block_dep, + std::string& label, int64_t& txn_id); std::promise<Status> debug_promise; std::future<Status> debug_future = debug_promise.get_future(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInsertExecutor.java deleted file mode 100644 index 08686561a41..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInsertExecutor.java +++ /dev/null @@ -1,256 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.trees.plans.commands.insert; - -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.EnvFactory; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.FeConstants; -import org.apache.doris.common.UserException; -import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.nereids.NereidsPlanner; -import org.apache.doris.nereids.trees.expressions.Alias; -import org.apache.doris.nereids.trees.expressions.Cast; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.NamedExpression; -import org.apache.doris.nereids.trees.expressions.literal.Literal; -import org.apache.doris.nereids.trees.plans.Plan; -import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation; -import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; -import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; -import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation; -import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; -import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion; -import org.apache.doris.planner.DataSink; -import org.apache.doris.planner.GroupCommitPlanner; -import org.apache.doris.planner.OlapTableSink; -import org.apache.doris.planner.PlanFragment; -import org.apache.doris.proto.InternalService; -import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.SqlModeHelper; -import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.thrift.TStatusCode; -import org.apache.doris.transaction.TransactionStatus; - -import com.google.common.collect.ImmutableList; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - -/** - * Handle group commit - */ -public class GroupCommitInsertExecutor extends AbstractInsertExecutor { - public static final Logger LOG = LogManager.getLogger(GroupCommitInsertExecutor.class); - private static final long INVALID_TXN_ID = -1L; - protected final NereidsPlanner planner; - private long txnId = INVALID_TXN_ID; - private TransactionStatus txnStatus = TransactionStatus.ABORTED; - - public GroupCommitInsertExecutor(ConnectContext ctx, TableIf table, String labelName, NereidsPlanner planner, - Optional<InsertCommandContext> insertCtx, boolean emptyInsert) { - super(ctx, table, labelName, planner, insertCtx, emptyInsert); - this.planner = planner; - } - - /** - * Handle group commit - */ - public static boolean canGroupCommit(ConnectContext ctx, DataSink sink, - PhysicalSink physicalSink, NereidsPlanner planner) { - // The flag is set to false before execute sql, if it is true, this is a http stream - if (ctx.isGroupCommit()) { - return false; - } - PhysicalOlapTableSink<?> olapSink = (PhysicalOlapTableSink<?>) physicalSink; - boolean can = analyzeGroupCommit(ctx, sink, olapSink, planner); - ctx.setGroupCommit(can); - return can; - } - - private static boolean analyzeGroupCommit(ConnectContext ctx, DataSink sink, - PhysicalOlapTableSink<?> physicalOlapTableSink, NereidsPlanner planner) { - if (!(sink instanceof OlapTableSink) || !ctx.getSessionVariable().isEnableInsertGroupCommit() - || ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate()) { - return false; - } - OlapTable targetTable = physicalOlapTableSink.getTargetTable(); - return ctx.getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES - && !ctx.isTxnModel() - && physicalOlapTableSink.getPartitionIds().isEmpty() - && targetTable.getTableProperty().getUseSchemaLightChange() - && !targetTable.getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME) - && isGroupCommitAvailablePlan(physicalOlapTableSink, planner); - } - - private static boolean literalExpr(NereidsPlanner planner) { - Optional<PhysicalUnion> union = planner.getPhysicalPlan() - .<PhysicalUnion>collect(PhysicalUnion.class::isInstance).stream().findAny(); - List<List<NamedExpression>> constantExprsList = null; - if (union.isPresent()) { - constantExprsList = union.get().getConstantExprsList(); - } - Optional<PhysicalOneRowRelation> oneRowRelation = planner.getPhysicalPlan() - .<PhysicalOneRowRelation>collect(PhysicalOneRowRelation.class::isInstance).stream().findAny(); - if (oneRowRelation.isPresent()) { - constantExprsList = ImmutableList.of(oneRowRelation.get().getProjects()); - } - for (List<NamedExpression> row : constantExprsList) { - for (Expression expr : row) { - while (expr instanceof Alias || expr instanceof Cast) { - expr = expr.child(0); - } - if (!(expr instanceof Literal)) { - return false; - } - } - } - return true; - } - - private static boolean isGroupCommitAvailablePlan(PhysicalOlapTableSink<? extends Plan> sink, - NereidsPlanner planner) { - Plan child = sink.child(); - if (child instanceof PhysicalDistribute) { - child = child.child(0); - } - return (child instanceof OneRowRelation || (child instanceof PhysicalUnion && child.arity() == 0)) - && literalExpr(planner); - } - - private void handleGroupCommit(ConnectContext ctx, DataSink sink, - PhysicalOlapTableSink<?> physicalOlapTableSink, NereidsPlanner planner) throws Exception { - // TODO we should refactor this to remove rely on UnionNode - List<InternalService.PDataRow> rows = new ArrayList<>(); - - Optional<PhysicalUnion> union = planner.getPhysicalPlan() - .<PhysicalUnion>collect(PhysicalUnion.class::isInstance).stream().findAny(); - List<List<NamedExpression>> constantExprsList = null; - if (union.isPresent()) { - constantExprsList = union.get().getConstantExprsList(); - } - Optional<PhysicalOneRowRelation> oneRowRelation = planner.getPhysicalPlan() - .<PhysicalOneRowRelation>collect(PhysicalOneRowRelation.class::isInstance).stream().findAny(); - if (oneRowRelation.isPresent()) { - constantExprsList = ImmutableList.of(oneRowRelation.get().getProjects()); - } - - // should set columns of sink since we maybe generate some invisible columns - List<Column> fullSchema = physicalOlapTableSink.getTargetTable().getFullSchema(); - List<Column> targetSchema; - if (physicalOlapTableSink.getTargetTable().getFullSchema().size() != physicalOlapTableSink.getCols().size()) { - targetSchema = fullSchema; - } else { - targetSchema = new ArrayList<>(physicalOlapTableSink.getCols()); - } - List<String> columnNames = targetSchema.stream() - .map(Column::getName) - .map(n -> n.replace("`", "``")) - .collect(Collectors.toList()); - for (List<NamedExpression> row : constantExprsList) { - rows.add(InsertUtils.getRowStringValue(row)); - } - GroupCommitPlanner groupCommitPlanner = EnvFactory.getInstance().createGroupCommitPlanner( - physicalOlapTableSink.getDatabase(), - physicalOlapTableSink.getTargetTable(), columnNames, ctx.queryId(), - ConnectContext.get().getSessionVariable().getGroupCommit()); - PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(ctx, rows); - TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); - // TODO: in legacy, there is a retry, we need to implement - if (code != TStatusCode.OK) { - String errMsg = "group commit insert failed. query_id: " + DebugUtil.printId(ConnectContext.get().queryId()) - + ", backend id: " + groupCommitPlanner.getBackend().getId() + ", status: " + response.getStatus(); - ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); - } - txnStatus = TransactionStatus.PREPARE; - String sb = "{'label':'" + response.getLabel() + "', 'status':'" + txnStatus.name() - + "', 'txnId':'" + response.getTxnId() + "'" - + "', 'optimizer':'" + "nereids" + "'" - + "}"; - ctx.getState().setOk(response.getLoadedRows(), (int) response.getFilteredRows(), sb); - ctx.setOrUpdateInsertResult(response.getTxnId(), response.getLabel(), - physicalOlapTableSink.getDatabase().getFullName(), physicalOlapTableSink.getTargetTable().getName(), - txnStatus, response.getLoadedRows(), (int) response.getFilteredRows()); - // update it, so that user can get loaded rows in fe.audit.log - ctx.updateReturnRows((int) response.getLoadedRows()); - } - - @Override - public void beginTransaction() { - - } - - @Override - protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) { - - } - - @Override - protected void beforeExec() { - } - - @Override - protected void onComplete() throws UserException { - - } - - @Override - protected void onFail(Throwable t) { - errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage(); - ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, t.getMessage()); - } - - @Override - protected void afterExec(StmtExecutor executor) { - - } - - protected final void execImpl() throws Exception { - Optional<PhysicalOlapTableSink<?>> plan = (planner.getPhysicalPlan() - .<PhysicalOlapTableSink<?>>collect(PhysicalSink.class::isInstance)).stream() - .findAny(); - PhysicalOlapTableSink<?> olapSink = plan.get(); - DataSink sink = planner.getFragments().get(0).getSink(); - handleGroupCommit(ctx, sink, olapSink, planner); - } - - @Override - public void executeSingleInsert(StmtExecutor executor, long jobId) throws Exception { - beforeExec(); - try { - execImpl(); - onComplete(); - } catch (Throwable t) { - onFail(t); - // retry group_commit insert when meet - if (t.getMessage().contains(GroupCommitPlanner.SCHEMA_CHANGE)) { - throw t; - } - return; - } - afterExec(executor); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 39507d72685..7a1280092b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -146,7 +146,10 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, if (cte.isPresent()) { this.logicalQuery = ((LogicalPlan) cte.get().withChildren(logicalQuery)); } - + if (this.logicalQuery instanceof UnboundTableSink) { + OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, targetTableIf, + (UnboundTableSink<?>) this.logicalQuery); + } LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext()); NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext()); planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift()); @@ -167,17 +170,16 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, if (physicalSink instanceof PhysicalOlapTableSink) { boolean emptyInsert = childIsEmptyRelation(physicalSink); - if (GroupCommitInsertExecutor.canGroupCommit(ctx, sink, physicalSink, planner)) { - insertExecutor = new GroupCommitInsertExecutor(ctx, targetTableIf, label, planner, insertCtx, - emptyInsert); - targetTableIf.readUnlock(); - return insertExecutor; - } OlapTable olapTable = (OlapTable) targetTableIf; // the insertCtx contains some variables to adjust SinkNode - insertExecutor = ctx.isTxnModel() - ? new OlapTxnInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert) - : new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert); + if (ctx.isTxnModel()) { + insertExecutor = new OlapTxnInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert); + } else if (ctx.isGroupCommit()) { + insertExecutor = new OlapGroupCommitInsertExecutor(ctx, olapTable, label, planner, insertCtx, + emptyInsert); + } else { + insertExecutor = new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert); + } boolean isEnableMemtableOnSinkNode = olapTable.getTableProperty().getUseSchemaLightChange() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java new file mode 100644 index 00000000000..5091bae17d1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.transaction.TransactionStatus; + +import com.google.common.base.Strings; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Optional; + +/** + * Insert executor for olap table with group commit + */ +public class OlapGroupCommitInsertExecutor extends OlapInsertExecutor { + private static final Logger LOG = LogManager.getLogger(OlapGroupCommitInsertExecutor.class); + + public OlapGroupCommitInsertExecutor(ConnectContext ctx, Table table, + String labelName, NereidsPlanner planner, Optional<InsertCommandContext> insertCtx, + boolean emptyInsert) { + super(ctx, table, labelName, planner, insertCtx, emptyInsert); + } + + protected static void analyzeGroupCommit(ConnectContext ctx, TableIf table, UnboundTableSink<?> tableSink) { + // The flag is set to false before execute sql, if it is true, this is a http stream + if (ctx.isGroupCommit()) { + return; + } + ctx.setGroupCommit(ctx.getSessionVariable().isEnableInsertGroupCommit() && !ctx.isTxnModel() + && !ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate() && table instanceof OlapTable + && ((OlapTable) table).getTableProperty().getUseSchemaLightChange() + && !((OlapTable) table).getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME) + && tableSink.getPartitions().isEmpty() + && (tableSink.child() instanceof OneRowRelation || tableSink.child() instanceof LogicalUnion)); + } + + @Override + public void beginTransaction() { + } + + @Override + protected void onComplete() { + if (ctx.getState().getStateType() == MysqlStateType.ERR) { + txnStatus = TransactionStatus.ABORTED; + } else { + txnStatus = TransactionStatus.PREPARE; + } + } + + @Override + protected void onFail(Throwable t) { + errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage(); + String queryId = DebugUtil.printId(ctx.queryId()); + // if any throwable being thrown during insert operation, first we should abort this txn + LOG.warn("insert [{}] with query id {} failed, url={}", labelName, queryId, coordinator.getTrackingUrl(), t); + StringBuilder sb = new StringBuilder(t.getMessage()); + if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) { + sb.append(". url: ").append(coordinator.getTrackingUrl()); + } + ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString()); + } + + @Override + protected void afterExec(StmtExecutor executor) { + labelName = coordinator.getLabel(); + txnId = coordinator.getTxnId(); + setReturnInfo(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index b4f5503ed44..0153700863d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -74,7 +74,7 @@ public class OlapInsertExecutor extends AbstractInsertExecutor { protected static final long INVALID_TXN_ID = -1L; private static final Logger LOG = LogManager.getLogger(OlapInsertExecutor.class); protected long txnId = INVALID_TXN_ID; - private TransactionStatus txnStatus = TransactionStatus.ABORTED; + protected TransactionStatus txnStatus = TransactionStatus.ABORTED; /** * constructor @@ -274,11 +274,14 @@ public class OlapInsertExecutor extends AbstractInsertExecutor { errMsg = "Record info of insert load with error " + e.getMessage(); } + setReturnInfo(); + } + + protected void setReturnInfo() { // {'label':'my_label1', 'status':'visible', 'txnId':'123'} // {'label':'my_label1', 'status':'visible', 'txnId':'123' 'err':'error messages'} StringBuilder sb = new StringBuilder(); - sb.append("{'label':'").append(labelName).append("', 'status':'") - .append(ctx.isTxnModel() ? TransactionStatus.PREPARE.name() : txnStatus.name()); + sb.append("{'label':'").append(labelName).append("', 'status':'").append(txnStatus.name()); sb.append("', 'txnId':'").append(txnId).append("'"); if (table.getType() == TableType.MATERIALIZED_VIEW) { sb.append("', 'rows':'").append(loadedRows).append("'"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapTxnInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapTxnInsertExecutor.java index ebe0a318e19..1512eca16e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapTxnInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapTxnInsertExecutor.java @@ -27,6 +27,7 @@ import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.transaction.SubTransactionState.SubTransactionType; import org.apache.doris.transaction.TransactionEntry; import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -43,10 +44,7 @@ public class OlapTxnInsertExecutor extends OlapInsertExecutor { String labelName, NereidsPlanner planner, Optional<InsertCommandContext> insertCtx, boolean emptyInsert) { super(ctx, table, labelName, planner, insertCtx, emptyInsert); - } - - public long getTxnId() { - return txnId; + txnStatus = TransactionStatus.PREPARE; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 9e7431e07c8..8c40d1a5938 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -240,6 +240,9 @@ public class Coordinator implements CoordInterface { private List<String> deltaUrls; private Map<String, String> loadCounters; private String trackingUrl; + // related txnId and label of group commit + private long txnId; + private String label; // for export private List<String> exportFiles; @@ -468,6 +471,14 @@ public class Coordinator implements CoordInterface { return trackingUrl; } + public long getTxnId() { + return txnId; + } + + public String getLabel() { + return label; + } + public void setExecMemoryLimit(long execMemoryLimit) { this.queryOptions.setMemLimit(execMemoryLimit); } @@ -2346,6 +2357,12 @@ public class Coordinator implements CoordInterface { if (params.isSetTrackingUrl()) { trackingUrl = params.getTrackingUrl(); } + if (params.isSetTxnId()) { + txnId = params.getTxnId(); + } + if (params.isSetLabel()) { + label = params.getLabel(); + } if (params.isSetExportFiles()) { updateExportFiles(params.getExportFiles()); } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 6b11402f299..26d4961bf09 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -506,6 +506,8 @@ struct TReportExecStatusParams { 28: optional list<DataSinks.TIcebergCommitData> iceberg_commit_datas + 29: optional i64 txn_id + 30: optional string label } struct TFeResult { diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy b/regression-test/suites/insert_p0/insert_group_commit_into.groovy index 37d38e47b9e..9105a8dc8db 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy @@ -146,7 +146,11 @@ suite("insert_group_commit_into") { group_commit_insert """ insert into ${table}(id) values(4); """, 1 group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """, 2 group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 - group_commit_insert """ insert into ${table}(id) select 6; """, 1 + if (item == "nereids") { + none_group_commit_insert """ insert into ${table}(id) select 6; """, 1 + } else { + group_commit_insert """ insert into ${table}(id) select 6; """, 1 + } getRowCount(6) order_qt_select1 """ select * from ${table} order by id, name, score asc; """ @@ -160,7 +164,7 @@ suite("insert_group_commit_into") { group_commit_insert """ insert into ${table}(id, name) values(4, 'e1'); """, 1 group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """, 2 group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 - group_commit_insert """ insert into ${table}(id) select 6; """, 1 + group_commit_insert """ insert into ${table}(id) values(6); """, 1 getRowCount(11) order_qt_select2 """ select * from ${table} order by id, name, score asc; """ @@ -171,7 +175,7 @@ suite("insert_group_commit_into") { group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """, 2 sql """ alter table ${table} ADD column age int after name; """ group_commit_insert_with_retry """ insert into ${table}(id, name) values(2, 'b'); """, 1 - group_commit_insert_with_retry """ insert into ${table}(id) select 6; """, 1 + group_commit_insert_with_retry """ insert into ${table}(id) values(6); """, 1 assertTrue(getAlterTableState(), "add column should success") getRowCount(17) @@ -183,7 +187,7 @@ suite("insert_group_commit_into") { sql """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """*/ sql """ truncate table ${table}; """ group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 - group_commit_insert """ insert into ${table}(id) select 6; """, 1 + group_commit_insert """ insert into ${table}(id) values(6); """, 1 getRowCount(2) order_qt_select4 """ select * from ${table} order by id, name, score asc; """ @@ -194,7 +198,7 @@ suite("insert_group_commit_into") { group_commit_insert """ insert into ${table}(id, name, age, score) values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2 sql """ alter table ${table} order by (id, name, score, age); """ group_commit_insert_with_retry """ insert into ${table}(id, name) values(2, 'b'); """, 1 - group_commit_insert_with_retry """ insert into ${table}(id) select 6; """, 1 + group_commit_insert_with_retry """ insert into ${table}(id) values(6); """, 1 assertTrue(getAlterTableState(), "modify column order should success") getRowCount(8) @@ -206,7 +210,7 @@ suite("insert_group_commit_into") { group_commit_insert """ insert into ${table}(id, name, age, score) values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2 sql """ alter table ${table} DROP column age; """ group_commit_insert_with_retry """ insert into ${table}(id, name) values(2, 'b'); """, 1 - group_commit_insert_with_retry """ insert into ${table}(id) select 6; """, 1 + group_commit_insert_with_retry """ insert into ${table}(id) values(6); """, 1 assertTrue(getAlterTableState(), "drop column should success") getRowCount(14) @@ -218,7 +222,7 @@ suite("insert_group_commit_into") { group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50),(101, 'a', 100); """, 2 sql """ alter table ${table} ADD ROLLUP r1(name, score); """ group_commit_insert_with_retry """ insert into ${table}(id, name) values(2, 'b'); """, 1 - group_commit_insert_with_retry """ insert into ${table}(id) select 6; """, 1 + group_commit_insert_with_retry """ insert into ${table}(id) values(6); """, 1 getRowCount(20) order_qt_select7 """ select name, score from ${table} order by name asc; """ @@ -226,7 +230,7 @@ suite("insert_group_commit_into") { if (item == "nereids") { group_commit_insert """ insert into ${table}(id, name, score) values(10 + 1, 'h', 100); """, 1 - group_commit_insert """ insert into ${table}(id, name, score) select 10 + 2, 'h', 100; """, 1 + none_group_commit_insert """ insert into ${table}(id, name, score) select 10 + 2, 'h', 100; """, 1 group_commit_insert """ insert into ${table} with label test_gc_""" + System.currentTimeMillis() + """ (id, name, score) values(13, 'h', 100); """, 1 getRowCount(23) } else { @@ -272,13 +276,10 @@ suite("insert_group_commit_into") { logger.info("observer url: " + url) connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = url) { sql """ set group_commit = async_mode; """ - sql """ set enable_nereids_dml = false; """ - sql """ set enable_profile= true; """ - sql """ set enable_nereids_planner = false; """ // 1. insert into def server_info = group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 - assertTrue(server_info.contains('query_id')) + /*assertTrue(server_info.contains('query_id')) // get query_id, such as 43f87963586a482a-b0496bcf9e2b5555 def query_id_index = server_info.indexOf("'query_id':'") + "'query_id':'".length() def query_id = server_info.substring(query_id_index, query_id_index + 33) @@ -296,7 +297,7 @@ suite("insert_group_commit_into") { logger.info("Get profile: code=" + code + ", out=" + out + ", err=" + err) assertEquals(code, 0) def json = parseJson(out) - assertEquals("success", json.msg.toLowerCase()) + assertEquals("success", json.msg.toLowerCase())*/ } } } else { diff --git a/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy b/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy index c0eb1431432..2c69824e7ab 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy @@ -195,21 +195,19 @@ suite("insert_group_commit_into_max_filter_ratio") { sql """ set group_commit = sync_mode; """ group_commit_insert """ insert into ${dbTableName} values (1, 'a', 10); """, 1 sql """ set group_commit = async_mode; """ - group_commit_insert """ insert into ${dbTableName}(id) select 2; """, 1 + group_commit_insert """ insert into ${dbTableName}(id) values(2); """, 1 sql """ set group_commit = off_mode; """ off_mode_group_commit_insert """ insert into ${dbTableName} values (3, 'a', 10); """, 1 + sql """ set group_commit = async_mode; """ if (item == "nereids") { - sql """ set group_commit = async_mode; """ - normal_insert """ insert into ${dbTableName} values (4, 'abc', 10); """, 0 - sql """ set enable_insert_strict = false; """ - normal_insert """ insert into ${dbTableName} values (5, 'abc', 10); """, 0 + group_commit_insert """ insert into ${dbTableName} values (4, 'abc', 10); """, 0 } else { - sql """ set group_commit = async_mode; """ fail_group_commit_insert """ insert into ${dbTableName} values (4, 'abc', 10); """, 0 - sql """ set enable_insert_strict = false; """ - group_commit_insert """ insert into ${dbTableName} values (5, 'abc', 10); """, 0 } + sql """ set enable_insert_strict = false; """ + group_commit_insert """ insert into ${dbTableName} values (5, 'abc', 10); """, 0 + // The row 6 and 7 is different between legacy and nereids try { sql """ set group_commit = off_mode; """ diff --git a/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy b/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy index 1bb978da1d1..502ef2906d5 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy @@ -102,7 +102,7 @@ suite("insert_group_commit_into_unique") { } group_commit_insert """ insert into ${dbTableName} values (1, 'a', 10),(5, 'q', 50); """, 2 - group_commit_insert """ insert into ${dbTableName}(id) select 6; """, 1 + group_commit_insert """ insert into ${dbTableName}(id) values(6); """, 1 group_commit_insert """ insert into ${dbTableName}(id) values(4); """, 1 group_commit_insert """ insert into ${dbTableName}(name, id) values('c', 3); """, 1 group_commit_insert """ insert into ${dbTableName}(id, name) values(2, 'b'); """, 1 @@ -188,7 +188,7 @@ suite("insert_group_commit_into_unique") { } group_commit_insert """ insert into ${dbTableName} values (1, 'a', 10),(5, 'q', 50); """, 2 - group_commit_insert """ insert into ${dbTableName}(id, score) select 6, 60; """, 1 + group_commit_insert """ insert into ${dbTableName}(id, score) values(6, 60); """, 1 group_commit_insert """ insert into ${dbTableName}(id, score) values(4, 70); """, 1 group_commit_insert """ insert into ${dbTableName}(name, id, score) values('c', 3, 30); """, 1 group_commit_insert """ insert into ${dbTableName}(score, id, name) values(30, 2, 'b'); """, 1 @@ -275,7 +275,7 @@ suite("insert_group_commit_into_unique") { } group_commit_insert """ insert into ${dbTableName}(id, name, score, __DORIS_SEQUENCE_COL__) values (1, 'a', 10, 100),(5, 'q', 50, 500); """, 2 - group_commit_insert """ insert into ${dbTableName}(id, score, __DORIS_SEQUENCE_COL__) select 6, 60, 600; """, 1 + group_commit_insert """ insert into ${dbTableName}(id, score, __DORIS_SEQUENCE_COL__) values(6, 60, 600); """, 1 group_commit_insert """ insert into ${dbTableName}(id, score, __DORIS_SEQUENCE_COL__) values(6, 50, 500); """, 1 group_commit_insert """ insert into ${dbTableName}(name, id, score, __DORIS_SEQUENCE_COL__) values('c', 3, 30, 300); """, 1 group_commit_insert """ insert into ${dbTableName}(score, id, name, __DORIS_SEQUENCE_COL__) values(30, 2, 'b', 200); """, 1 diff --git a/regression-test/suites/insert_p0/insert_group_commit_into_unique_sync_mode.groovy b/regression-test/suites/insert_p0/insert_group_commit_into_unique_sync_mode.groovy index d9382dca3f4..93d39f303ad 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into_unique_sync_mode.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into_unique_sync_mode.groovy @@ -140,7 +140,7 @@ suite("insert_group_commit_into_unique_sync_mode") { } group_commit_insert """ insert into ${dbTableName} values (1, 'a', 10),(5, 'q', 50); """, 2 - group_commit_insert """ insert into ${dbTableName}(id) select 6; """, 1 + group_commit_insert """ insert into ${dbTableName}(id) values(6); """, 1 group_commit_insert """ insert into ${dbTableName}(id) values(4); """, 1 group_commit_insert """ insert into ${dbTableName}(name, id) values('c', 3); """, 1 group_commit_insert """ insert into ${dbTableName}(id, name) values(2, 'b'); """, 1 @@ -228,7 +228,7 @@ suite("insert_group_commit_into_unique_sync_mode") { } group_commit_insert """ insert into ${dbTableName} values (1, 'a', 10),(5, 'q', 50); """, 2 - group_commit_insert """ insert into ${dbTableName}(id, score) select 6, 60; """, 1 + group_commit_insert """ insert into ${dbTableName}(id, score) values(6, 60); """, 1 group_commit_insert """ insert into ${dbTableName}(id, score) values(4, 70); """, 1 group_commit_insert """ insert into ${dbTableName}(name, id, score) values('c', 3, 30); """, 1 sql """ set group_commit = OFF_MODE; """ @@ -319,7 +319,7 @@ suite("insert_group_commit_into_unique_sync_mode") { } group_commit_insert """ insert into ${dbTableName}(id, name, score, __DORIS_SEQUENCE_COL__) values (1, 'a', 10, 100),(5, 'q', 50, 500); """, 2 - group_commit_insert """ insert into ${dbTableName}(id, score, __DORIS_SEQUENCE_COL__) select 6, 60, 600; """, 1 + group_commit_insert """ insert into ${dbTableName}(id, score, __DORIS_SEQUENCE_COL__) values(6, 60, 600); """, 1 group_commit_insert """ insert into ${dbTableName}(id, score, __DORIS_SEQUENCE_COL__) values(6, 50, 500); """, 1 group_commit_insert """ insert into ${dbTableName}(name, id, score, __DORIS_SEQUENCE_COL__) values('c', 3, 30, 300); """, 1 group_commit_insert """ insert into ${dbTableName}(score, id, name, __DORIS_SEQUENCE_COL__) values(30, 2, 'b', 200); """, 1 diff --git a/regression-test/suites/insert_p0/test_group_commit_timeout.groovy b/regression-test/suites/insert_p0/test_group_commit_timeout.groovy index 7866a33df0e..3af6b9b11a0 100644 --- a/regression-test/suites/insert_p0/test_group_commit_timeout.groovy +++ b/regression-test/suites/insert_p0/test_group_commit_timeout.groovy @@ -46,7 +46,7 @@ suite("test_group_commit_timeout", "nonConcurrent") { } catch (Exception e) { long end = System.currentTimeMillis() logger.info("failed " + e.getMessage()) - assertTrue(e.getMessage().contains("FragmentMgr cancel worker going to cancel timeout instance")) + assertTrue(e.getMessage().contains("FragmentMgr cancel worker going to cancel timeout instance") || e.getMessage().contains("Execute timeout")) assertTrue(end - start <= 60000) } finally { sql "SET global query_timeout = ${query_timeout[0][1]}" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org