This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 24969d5ad42 [fix](group commit) Fix some group commit problems (#37595) 24969d5ad42 is described below commit 24969d5ad42a21a2fe90e6a69c66c45bbdb54e1d Author: meiyi <myime...@gmail.com> AuthorDate: Thu Jul 11 15:20:02 2024 +0800 [fix](group commit) Fix some group commit problems (#37595) ## Proposed changes Fix some problems found by https://github.com/apache/doris/pull/26856/files 1. Some insert should not run into group commit mode, including insert overwrite and mvmt 2. If there is no partition for the data, should throw an exception --- .../exec/group_commit_block_sink_operator.cpp | 24 +++++++++++++++------- be/src/runtime/group_commit_mgr.cpp | 24 +++++++++++----------- be/src/runtime/group_commit_mgr.h | 6 ++---- .../glue/translator/PhysicalPlanTranslator.java | 5 +++-- .../commands/insert/InsertIntoTableCommand.java | 4 +++- .../insert/OlapGroupCommitInsertExecutor.java | 3 +++ .../insert_p0/insert_group_commit_into.groovy | 2 ++ regression-test/suites/insert_p0/txn_insert.groovy | 2 +- .../test_group_commit_stream_load.groovy | 2 +- 9 files changed, 44 insertions(+), 28 deletions(-) diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp index 424ede07be5..9bac6d4ed29 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp @@ -61,21 +61,19 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* state) { "CreateGroupCommitPlanDependency", true); _put_block_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), "GroupCommitPutBlockDependency", true); - WARN_IF_ERROR(_initialize_load_queue(), ""); + [[maybe_unused]] auto st = _initialize_load_queue(); return Status::OK(); } Status GroupCommitBlockSinkLocalState::_initialize_load_queue() { auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>(); if (_state->exec_env()->wal_mgr()->is_running()) { - std::string label; - int64_t txn_id; RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue( p._db_id, p._table_id, p._base_schema_version, p._load_id, _load_block_queue, _state->be_exec_version(), _state->query_mem_tracker(), _create_plan_dependency, - _put_block_dependency, label, txn_id)); - _state->set_import_label(label); - _state->set_wal_id(txn_id); // wal_id is txn_id + _put_block_dependency)); + _state->set_import_label(_load_block_queue->label); + _state->set_wal_id(_load_block_queue->txn_id); // wal_id is txn_id return Status::OK(); } else { return Status::InternalError("be is stopping"); @@ -339,13 +337,25 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, vectorized::Bloc local_state._vpartition->find_partition(block.get(), index, local_state._partitions[index]); } + bool stop_processing = false; for (int row_index = 0; row_index < rows; row_index++) { if (local_state._partitions[row_index] == nullptr) [[unlikely]] { local_state._filter_bitmap.Set(row_index, true); LOG(WARNING) << "no partition for this tuple. tuple=" << block->dump_data(row_index, 1); + RETURN_IF_ERROR(state->append_error_msg_to_file( + []() -> std::string { return ""; }, + [&]() -> std::string { + fmt::memory_buffer buf; + fmt::format_to(buf, "no partition for this tuple. tuple=\n{}", + block->dump_data(row_index, 1)); + return fmt::to_string(buf); + }, + &stop_processing)); + local_state._has_filtered_rows = true; + state->update_num_rows_load_filtered(1); + state->update_num_rows_load_total(-1); } - local_state._has_filtered_rows = true; } } diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index f0a57162458..5f989da023b 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -138,7 +138,7 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* auto last_print_duration = std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::steady_clock::now() - _last_print_time) .count(); - if (last_print_duration >= 5000) { + if (last_print_duration >= 10000) { _last_print_time = std::chrono::steady_clock::now(); LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id << ", label=" << label << ", instance_id=" << load_instance_id @@ -258,15 +258,13 @@ Status GroupCommitTable::get_first_block_load_queue( std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<pipeline::Dependency> create_plan_dep, - std::shared_ptr<pipeline::Dependency> put_block_dep, std::string& label, int64_t& txn_id) { + std::shared_ptr<pipeline::Dependency> put_block_dep) { DCHECK(table_id == _table_id); std::unique_lock l(_lock); auto try_to_get_matched_queue = [&]() -> Status { for (const auto& [_, inner_block_queue] : _load_block_queues) { if (inner_block_queue->contain_load_id(load_id)) { load_block_queue = inner_block_queue; - label = inner_block_queue->label; - txn_id = inner_block_queue->txn_id; return Status::OK(); } } @@ -275,8 +273,6 @@ Status GroupCommitTable::get_first_block_load_queue( if (base_schema_version == inner_block_queue->schema_version) { if (inner_block_queue->add_load_id(load_id, put_block_dep).ok()) { load_block_queue = inner_block_queue; - label = inner_block_queue->label; - txn_id = inner_block_queue->txn_id; return Status::OK(); } } else { @@ -319,6 +315,10 @@ Status GroupCommitTable::get_first_block_load_queue( void GroupCommitTable::remove_load_id(const UniqueId& load_id) { std::unique_lock l(_lock); + if (_create_plan_deps.find(load_id) != _create_plan_deps.end()) { + _create_plan_deps.erase(load_id); + return; + } for (const auto& [_, inner_block_queue] : _load_block_queues) { if (inner_block_queue->remove_load_id(load_id).ok()) { return; @@ -391,13 +391,13 @@ Status GroupCommitTable::_create_group_commit_load(int be_exe_version, instance_id, label, txn_id, schema_version, _all_block_queues_bytes, result.wait_internal_group_commit_finish, result.group_commit_interval_ms, result.group_commit_data_bytes); - std::unique_lock l(_lock); RETURN_IF_ERROR(load_block_queue->create_wal( _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(), pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs, be_exe_version)); - _load_block_queues.emplace(instance_id, load_block_queue); + std::unique_lock l(_lock); + _load_block_queues.emplace(instance_id, load_block_queue); std::vector<UniqueId> success_load_ids; for (const auto& [id, load_info] : _create_plan_deps) { auto create_dep = std::get<0>(load_info); @@ -409,8 +409,8 @@ Status GroupCommitTable::_create_group_commit_load(int be_exe_version, } } } - for (const auto& id2 : success_load_ids) { - _create_plan_deps.erase(id2); + for (const auto& id : success_load_ids) { + _create_plan_deps.erase(id); } } } @@ -610,7 +610,7 @@ Status GroupCommitMgr::get_first_block_load_queue( std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<pipeline::Dependency> create_plan_dep, - std::shared_ptr<pipeline::Dependency> put_block_dep, std::string& label, int64_t& txn_id) { + std::shared_ptr<pipeline::Dependency> put_block_dep) { std::shared_ptr<GroupCommitTable> group_commit_table; { std::lock_guard wlock(_lock); @@ -623,7 +623,7 @@ Status GroupCommitMgr::get_first_block_load_queue( } RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue( table_id, base_schema_version, load_id, load_block_queue, be_exe_version, mem_tracker, - create_plan_dep, put_block_dep, label, txn_id)); + create_plan_dep, put_block_dep)); return Status::OK(); } diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 16c7e0c24d3..c6cb34a022a 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -160,8 +160,7 @@ public: int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<pipeline::Dependency> create_plan_dep, - std::shared_ptr<pipeline::Dependency> put_block_dep, - std::string& label, int64_t& txn_id); + std::shared_ptr<pipeline::Dependency> put_block_dep); Status get_load_block_queue(const TUniqueId& instance_id, std::shared_ptr<LoadBlockQueue>& load_block_queue, std::shared_ptr<pipeline::Dependency> get_block_dep); @@ -211,8 +210,7 @@ public: int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<pipeline::Dependency> create_plan_dep, - std::shared_ptr<pipeline::Dependency> put_block_dep, - std::string& label, int64_t& txn_id); + std::shared_ptr<pipeline::Dependency> put_block_dep); void remove_load_id(int64_t table_id, const UniqueId& load_id); std::promise<Status> debug_promise; std::future<Status> debug_future = debug_promise.get_future(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 62c823fdec4..b4a957e72fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -432,8 +432,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla // This statement is only used in the group_commit mode if (context.getConnectContext().isGroupCommit()) { sink = new GroupCommitBlockSink(olapTableSink.getTargetTable(), olapTuple, - olapTableSink.getTargetTable().getPartitionIds(), olapTableSink.isSingleReplicaLoad(), - context.getSessionVariable().getGroupCommit(), 0); + olapTableSink.getTargetTable().getPartitionIds(), olapTableSink.isSingleReplicaLoad(), + context.getSessionVariable().getGroupCommit(), + ConnectContext.get().getSessionVariable().getEnableInsertStrict() ? 0 : 1); } else { sink = new OlapTableSink( olapTableSink.getTargetTable(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 7a1280092b0..df6fcd69fcb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -146,7 +146,9 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, if (cte.isPresent()) { this.logicalQuery = ((LogicalPlan) cte.get().withChildren(logicalQuery)); } - if (this.logicalQuery instanceof UnboundTableSink) { + boolean isOverwrite = insertCtx.isPresent() && insertCtx.get() instanceof OlapInsertCommandContext + && ((OlapInsertCommandContext) insertCtx.get()).isOverwrite(); + if (this.logicalQuery instanceof UnboundTableSink && !isOverwrite) { OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, targetTableIf, (UnboundTableSink<?>) this.logicalQuery); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java index 5091bae17d1..984e8b0c8ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java @@ -17,12 +17,14 @@ package org.apache.doris.nereids.trees.plans.commands.insert; +import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation; @@ -60,6 +62,7 @@ public class OlapGroupCommitInsertExecutor extends OlapInsertExecutor { && ((OlapTable) table).getTableProperty().getUseSchemaLightChange() && !((OlapTable) table).getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME) && tableSink.getPartitions().isEmpty() + && (!(table instanceof MTMV) || MTMVUtil.allowModifyMTMVData(ctx)) && (tableSink.child() instanceof OneRowRelation || tableSink.child() instanceof LogicalUnion)); } diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy b/regression-test/suites/insert_p0/insert_group_commit_into.groovy index 9105a8dc8db..e1cb943a9ab 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy @@ -219,7 +219,9 @@ suite("insert_group_commit_into") { // 7. insert into and add rollup group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 group_commit_insert """ insert into ${table}(id) values(4); """, 1 + sql "set enable_insert_strict=false" group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50),(101, 'a', 100); """, 2 + sql "set enable_insert_strict=true" sql """ alter table ${table} ADD ROLLUP r1(name, score); """ group_commit_insert_with_retry """ insert into ${table}(id, name) values(2, 'b'); """, 1 group_commit_insert_with_retry """ insert into ${table}(id) values(6); """, 1 diff --git a/regression-test/suites/insert_p0/txn_insert.groovy b/regression-test/suites/insert_p0/txn_insert.groovy index 088e1134900..4c42c3b12c4 100644 --- a/regression-test/suites/insert_p0/txn_insert.groovy +++ b/regression-test/suites/insert_p0/txn_insert.groovy @@ -495,7 +495,7 @@ suite("txn_insert") { assertFalse(true, "should not reach here") } catch (Exception e) { logger.info("exception: " + e) - assertTrue(e.getMessage().contains("The transaction is already timeout")) + assertTrue(e.getMessage().contains("The transaction is already timeout") || e.getMessage().contains("Execute timeout")) } finally { try { sql "rollback" diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy index e12a1f2f01b..0f823f1d4ef 100644 --- a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy @@ -194,7 +194,7 @@ suite("test_group_commit_stream_load") { time 10000 // limit inflight 10s check { result, exception, startTime, endTime -> - checkStreamLoadResult(exception, result, 6, 3, 2, 1) + checkStreamLoadResult(exception, result, 6, 2, 3, 1) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org