This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit eea9b56f69f6619b35797fc2d7b742fb5ef31f58 Author: meiyi <myime...@gmail.com> AuthorDate: Tue Mar 5 18:55:03 2024 +0800 [fix](group commit) handle group commit create plan error (#31757) --- be/src/olap/wal/wal_table.cpp | 38 ++++++++-------------- be/src/olap/wal/wal_table.h | 2 +- be/src/runtime/group_commit_mgr.cpp | 22 +++++-------- be/src/runtime/group_commit_mgr.h | 2 +- be/test/olap/wal/wal_manager_test.cpp | 2 -- .../apache/doris/service/FrontendServiceImpl.java | 5 ++- 6 files changed, 27 insertions(+), 44 deletions(-) diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp index 0389c6fed21..14e3779748c 100644 --- a/be/src/olap/wal/wal_table.cpp +++ b/be/src/olap/wal/wal_table.cpp @@ -45,10 +45,6 @@ WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id) } WalTable::~WalTable() {} -#ifdef BE_TEST -Status k_stream_load_exec_status; -#endif - void WalTable::add_wal(int64_t wal_id, std::string wal) { std::lock_guard<std::mutex> lock(_replay_wal_lock); LOG(INFO) << "add replay wal=" << wal; @@ -94,19 +90,18 @@ Status WalTable::_relay_wal_one_by_one() { for (auto wal_info : _replaying_queue) { wal_info->add_retry_num(); auto st = _replay_wal_internal(wal_info->get_wal_path()); - if (!st.ok()) { - doris::wal_fail << 1; - LOG(WARNING) << "failed to replay wal=" << wal_info->get_wal_path() - << ", st=" << st.to_string(); - if (!st.is<ErrorCode::NOT_FOUND>() && !st.is<ErrorCode::DATA_QUALITY_ERROR>()) { - need_retry_wals.push_back(wal_info); - } else { - need_delete_wals.push_back(wal_info); - } - } else { + auto msg = st.msg(); + if (st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() || st.is<ErrorCode::NOT_FOUND>() || + st.is<ErrorCode::DATA_QUALITY_ERROR>() || + msg.find("LabelAlreadyUsedException") != msg.npos) { LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path() << ", st=" << st.to_string(); need_delete_wals.push_back(wal_info); + } else { + doris::wal_fail << 1; + LOG(WARNING) << "failed to replay wal=" << wal_info->get_wal_path() + << ", st=" << st.to_string(); + need_retry_wals.push_back(wal_info); } } { @@ -200,7 +195,7 @@ Status WalTable::_replay_wal_internal(const std::string& wal) { [[maybe_unused]] auto st = _try_abort_txn(_db_id, label); } #endif - return _replay_one_txn_with_stremaload(wal_id, wal, label); + return _replay_one_wal_with_streamload(wal_id, wal, label); } Status WalTable::_construct_sql_str(const std::string& wal, const std::string& label, @@ -241,6 +236,7 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal, ctx->sql_str = sql_str; ctx->wal_id = wal_id; ctx->label = label; + ctx->need_commit_self = false; ctx->auth.token = "relay_wal"; // this is a fake, fe not check it now ctx->auth.user = "admin"; ctx->group_commit = false; @@ -263,19 +259,13 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal, return st; } -Status WalTable::_replay_one_txn_with_stremaload(int64_t wal_id, const std::string& wal, +Status WalTable::_replay_one_wal_with_streamload(int64_t wal_id, const std::string& wal, const std::string& label) { - bool success = false; #ifndef BE_TEST - auto st = _handle_stream_load(wal_id, wal, label); - auto msg = st.msg(); - success = st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() || - msg.find("LabelAlreadyUsedException") != msg.npos; + return _handle_stream_load(wal_id, wal, label); #else - success = k_stream_load_exec_status.ok(); - auto st = Status::OK(); + return Status::OK(); #endif - return success ? Status::OK() : st; } void WalTable::stop() { diff --git a/be/src/olap/wal/wal_table.h b/be/src/olap/wal/wal_table.h index f6ed3d865b9..0e781cd9153 100644 --- a/be/src/olap/wal/wal_table.h +++ b/be/src/olap/wal/wal_table.h @@ -50,7 +50,7 @@ private: Status _get_column_info(int64_t db_id, int64_t tb_id, std::map<int64_t, std::string>& column_info_map); - Status _replay_one_txn_with_stremaload(int64_t wal_id, const std::string& wal, + Status _replay_one_wal_with_streamload(int64_t wal_id, const std::string& wal, const std::string& label); Status _handle_stream_load(int64_t wal_id, const std::string& wal, const std::string& label); Status _construct_sql_str(const std::string& wal, const std::string& label, diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 1e231055cff..887dd856bb4 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -209,13 +209,16 @@ Status GroupCommitTable::get_first_block_load_queue( "schema version not match, maybe a schema change is in process. Please " "retry this load manually."); } - if (!_need_plan_fragment) { - _need_plan_fragment = true; + if (!_is_creating_plan_fragment) { + _is_creating_plan_fragment = true; RETURN_IF_ERROR(_thread_pool->submit_func([&] { auto st = _create_group_commit_load(load_block_queue, be_exe_version); if (!st.ok()) { - LOG(WARNING) << "fail to create block queue,st=" << st.to_string(); + LOG(WARNING) << "create group commit load error, st=" << st.to_string(); load_block_queue.reset(); + std::unique_lock l(_lock); + _is_creating_plan_fragment = false; + _cv.notify_all(); } })); } @@ -241,13 +244,6 @@ Status GroupCommitTable::get_first_block_load_queue( Status GroupCommitTable::_create_group_commit_load( std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version) { Status st = Status::OK(); - std::unique_ptr<int, std::function<void(int*)>> finish_plan_func((int*)0x01, [&](int*) { - if (!st.ok()) { - std::unique_lock l(_lock); - _need_plan_fragment = false; - _cv.notify_all(); - } - }); TStreamLoadPutRequest request; UniqueId load_id = UniqueId::gen_uid(); TUniqueId tload_id; @@ -281,13 +277,13 @@ Status GroupCommitTable::_create_group_commit_load( 10000L); if (!st.ok()) { LOG(WARNING) << "create group commit load rpc error, st=" << st.to_string(); + return st; } - RETURN_IF_ERROR(st); st = Status::create<false>(result.status); if (!st.ok()) { LOG(WARNING) << "create group commit load error, st=" << st.to_string(); + return st; } - RETURN_IF_ERROR(st); auto schema_version = result.base_schema_version; auto is_pipeline = result.__isset.pipeline_params; auto& params = result.params; @@ -326,7 +322,7 @@ Status GroupCommitTable::_create_group_commit_load( be_exe_version)); } _load_block_queues.emplace(instance_id, load_block_queue); - _need_plan_fragment = false; + _is_creating_plan_fragment = false; _cv.notify_all(); } st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params, diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 5357ba208f7..dc135ea1063 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -156,7 +156,7 @@ private: std::condition_variable _cv; // fragment_instance_id to load_block_queue std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> _load_block_queues; - bool _need_plan_fragment = false; + bool _is_creating_plan_fragment = false; }; class GroupCommitMgr { diff --git a/be/test/olap/wal/wal_manager_test.cpp b/be/test/olap/wal/wal_manager_test.cpp index d264cb621c3..2dc59cec9d9 100644 --- a/be/test/olap/wal/wal_manager_test.cpp +++ b/be/test/olap/wal/wal_manager_test.cpp @@ -44,7 +44,6 @@ namespace doris { extern TLoadTxnBeginResult k_stream_load_begin_result; -extern Status k_stream_load_exec_status; ExecEnv* _env = nullptr; std::filesystem::path wal_dir = std::filesystem::current_path().string() + "/wal_test"; @@ -103,7 +102,6 @@ public: TEST_F(WalManagerTest, recovery_normal) { _env->wal_mgr()->wal_limit_test_bytes = 1099511627776; - k_stream_load_exec_status = Status::OK(); std::string db_id = "1"; int64_t tb_1_id = 1; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 850686116e2..1316425df4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1687,12 +1687,11 @@ public class FrontendServiceImpl implements FrontendService.Iface { try { loadTxnRollbackImpl(request); } catch (MetaNotFoundException e) { - String msg = "failed to rollback txn" + request.getTxnId(); - LOG.warn(msg, e); + LOG.warn("failed to rollback txn, id: {}, label: {}", request.getTxnId(), request.getLabel(), e); status.setStatusCode(TStatusCode.NOT_FOUND); status.addToErrorMsgs(e.getMessage()); } catch (UserException e) { - LOG.warn("failed to rollback txn {}: {}", request.getTxnId(), e.getMessage()); + LOG.warn("failed to rollback txn, id: {}, label: {}", request.getTxnId(), request.getLabel(), e); status.setStatusCode(TStatusCode.ANALYSIS_ERROR); status.addToErrorMsgs(e.getMessage()); } catch (Throwable e) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org