This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 550ed2b6e75 [Fix](executor)Fix when Fe send empty wg list to be may cause query failed. (#34074) 550ed2b6e75 is described below commit 550ed2b6e75a6c4b698f5f6b1fa9027a218e2cdb Author: wangbo <wan...@apache.org> AuthorDate: Wed Apr 24 23:36:17 2024 +0800 [Fix](executor)Fix when Fe send empty wg list to be may cause query failed. (#34074) --- be/src/agent/topic_subscriber.cpp | 6 +- be/src/agent/workload_group_listener.cpp | 27 +++++++-- be/src/runtime/workload_group/workload_group.cpp | 4 +- be/src/runtime/workload_group/workload_group.h | 7 ++- .../workload_group/workload_group_manager.cpp | 67 ++++++++++++---------- .../main/java/org/apache/doris/catalog/Env.java | 22 +++---- .../doris/common/publish/TopicPublisherThread.java | 24 ++++++-- .../common/publish/WorkloadGroupPublisher.java | 14 ++++- 8 files changed, 114 insertions(+), 57 deletions(-) diff --git a/be/src/agent/topic_subscriber.cpp b/be/src/agent/topic_subscriber.cpp index 7f7cffd8840..f62bdaef099 100644 --- a/be/src/agent/topic_subscriber.cpp +++ b/be/src/agent/topic_subscriber.cpp @@ -40,12 +40,14 @@ void TopicSubscriber::handle_topic_info(const TPublishTopicRequest& topic_reques // eg, update workload info may delay other listener, then we need add a thread here // to handle_topic_info asynchronous std::shared_lock lock(_listener_mtx); - LOG(INFO) << "begin handle topic info"; + LOG(INFO) << "[topic_publish]begin handle topic info"; for (auto& listener_pair : _registered_listeners) { if (topic_request.topic_map.find(listener_pair.first) != topic_request.topic_map.end()) { + LOG(INFO) << "[topic_publish]begin handle topic " << listener_pair.first + << ", size=" << topic_request.topic_map.at(listener_pair.first).size(); listener_pair.second->handle_topic_info( topic_request.topic_map.at(listener_pair.first)); - LOG(INFO) << "handle topic " << listener_pair.first << " successfully"; + LOG(INFO) << "[topic_publish]finish handle topic " << listener_pair.first; } } } diff --git a/be/src/agent/workload_group_listener.cpp b/be/src/agent/workload_group_listener.cpp index f98315fa433..822e3c692f7 100644 --- a/be/src/agent/workload_group_listener.cpp +++ b/be/src/agent/workload_group_listener.cpp @@ -26,21 +26,27 @@ namespace doris { void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topic_info_list) { std::set<uint64_t> current_wg_ids; + bool is_set_workload_group_info = false; + int list_size = topic_info_list.size(); for (const TopicInfo& topic_info : topic_info_list) { if (!topic_info.__isset.workload_group_info) { continue; } + is_set_workload_group_info = true; // 1 parse topicinfo to group info WorkloadGroupInfo workload_group_info; Status ret = WorkloadGroupInfo::parse_topic_info(topic_info.workload_group_info, &workload_group_info); + // it means FE has this wg, but may parse failed, so we should not delete it. + if (workload_group_info.id != 0) { + current_wg_ids.insert(workload_group_info.id); + } if (!ret.ok()) { - LOG(INFO) << "parse topic info failed, tg_id=" << workload_group_info.id - << ", reason:" << ret.to_string(); + LOG(INFO) << "[topic_publish_wg]parse topic info failed, tg_id=" + << workload_group_info.id << ", reason:" << ret.to_string(); continue; } - current_wg_ids.insert(workload_group_info.id); // 2 update workload group auto tg = @@ -53,16 +59,25 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi // 4 create and update task scheduler tg->upsert_task_scheduler(&workload_group_info, _exec_env); - LOG(INFO) << "update workload group finish, tg info=" << tg->debug_string() - << ", enable_cpu_hard_limit=" + LOG(INFO) << "[topic_publish_wg]update workload group finish, tg info=" + << tg->debug_string() << ", enable_cpu_hard_limit=" << (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() ? "true" : "false") << ", cgroup cpu_shares=" << workload_group_info.cgroup_cpu_shares << ", cgroup cpu_hard_limit=" << workload_group_info.cgroup_cpu_hard_limit << ", enable_cgroup_cpu_soft_limit=" << (config::enable_cgroup_cpu_soft_limit ? "true" : "false") - << ", cgroup home path=" << config::doris_cgroup_cpu_path; + << ", cgroup home path=" << config::doris_cgroup_cpu_path + << ", list size=" << list_size; } + // NOTE(wb) when is_set_workload_group_info=false, it means FE send a empty workload group list + // this should not happens, because FE should has at least one normal group. + // just log it if that happens + if (!is_set_workload_group_info) { + LOG(INFO) << "[topic_publish_wg]unexpected error happens, no workload group info is " + "set, list size=" + << list_size; + } _exec_env->workload_group_mgr()->delete_workload_group_by_ids(current_wg_ids); } } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 1b15e89b08e..673263f1a17 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -68,11 +68,11 @@ std::string WorkloadGroup::debug_string() const { "TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, enable_memory_overcommit = " "{}, version = {}, cpu_hard_limit = {}, scan_thread_num = " "{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = {}, " - "spill_low_watermark={}, spill_high_watermark={}]", + "spill_low_watermark={}, spill_high_watermark={}, is_shutdown={}, query_num={}]", _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES), _enable_memory_overcommit ? "true" : "false", _version, cpu_hard_limit(), _scan_thread_num, _max_remote_scan_thread_num, _min_remote_scan_thread_num, - _spill_low_watermark, _spill_high_watermark); + _spill_low_watermark, _spill_high_watermark, _is_shutdown, _query_ctxs.size()); } void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) { diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 49bcd841a0f..d4ef689766a 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -119,7 +119,7 @@ public: // If the workload group is set shutdown, then should not run any more, // because the scheduler pool and other pointer may be released. return Status::InternalError( - "Failed add query to workload group, the workload group is shutdown. host: {}", + "Failed add query to wg {}, the workload group is shutdown. host: {}", _id, BackendOptions::get_localhost()); } _query_ctxs.insert({query_id, query_ctx}); @@ -136,6 +136,11 @@ public: _is_shutdown = true; } + bool can_be_dropped() { + std::shared_lock<std::shared_mutex> r_lock(_mutex); + return _is_shutdown && _query_ctxs.size() == 0; + } + int query_num() { std::shared_lock<std::shared_mutex> r_lock(_mutex); return _query_ctxs.size(); diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index e336c9f80a8..a0e0de75f36 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -76,35 +76,40 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i int64_t begin_time = MonotonicMillis(); // 1 get delete group without running queries std::vector<WorkloadGroupPtr> deleted_task_groups; + int old_wg_size = 0; + int new_wg_size = 0; { std::lock_guard<std::shared_mutex> write_lock(_group_mutex); + old_wg_size = _workload_groups.size(); for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) { - uint64_t tg_id = iter->first; + uint64_t wg_id = iter->first; auto workload_group_ptr = iter->second; - if (used_wg_id.find(tg_id) == used_wg_id.end()) { + if (used_wg_id.find(wg_id) == used_wg_id.end()) { workload_group_ptr->shutdown(); - // only when no query running in workload group, its resource can be released in BE - if (workload_group_ptr->query_num() == 0) { - LOG(INFO) << "There is no query in wg " << tg_id << ", delete it."; - deleted_task_groups.push_back(workload_group_ptr); - } + LOG(INFO) << "[topic_publish_wg] shutdown wg:" << wg_id; + } + // wg is shutdown and running rum = 0, its resource can be released in BE + if (workload_group_ptr->can_be_dropped()) { + LOG(INFO) << "[topic_publish_wg]There is no query in wg" << wg_id << ", delete it."; + deleted_task_groups.push_back(workload_group_ptr); } } } // 2 stop active thread - for (auto& tg : deleted_task_groups) { + for (auto& wg : deleted_task_groups) { // There is not lock here, but the tg may be released by another - // thread, so that we should use shared ptr here, not use tg_id - tg->try_stop_schedulers(); + // thread, so that we should use shared ptr here, not use wg_id + wg->try_stop_schedulers(); } // 3 release resource in memory { std::lock_guard<std::shared_mutex> write_lock(_group_mutex); - for (auto& tg : deleted_task_groups) { - _workload_groups.erase(tg->id()); + for (auto& wg : deleted_task_groups) { + _workload_groups.erase(wg->id()); } + new_wg_size = _workload_groups.size(); } // 4 clear cgroup dir @@ -113,28 +118,32 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i // So the first time to rmdir a cgroup path may failed. // Using cgdelete has no such issue. { - std::lock_guard<std::shared_mutex> write_lock(_init_cg_ctl_lock); - if (!_cg_cpu_ctl) { - _cg_cpu_ctl = std::make_unique<CgroupV1CpuCtl>(); - } - if (!_is_init_succ) { - Status ret = _cg_cpu_ctl->init(); - if (ret.ok()) { - _is_init_succ = true; - } else { - LOG(INFO) << "init workload group mgr cpu ctl failed, " << ret.to_string(); + if (config::doris_cgroup_cpu_path != "") { + std::lock_guard<std::shared_mutex> write_lock(_init_cg_ctl_lock); + if (!_cg_cpu_ctl) { + _cg_cpu_ctl = std::make_unique<CgroupV1CpuCtl>(); } - } - if (_is_init_succ) { - Status ret = _cg_cpu_ctl->delete_unused_cgroup_path(used_wg_id); - if (!ret.ok()) { - LOG(WARNING) << ret.to_string(); + if (!_is_init_succ) { + Status ret = _cg_cpu_ctl->init(); + if (ret.ok()) { + _is_init_succ = true; + } else { + LOG(INFO) << "[topic_publish_wg]init workload group mgr cpu ctl failed, " + << ret.to_string(); + } + } + if (_is_init_succ) { + Status ret = _cg_cpu_ctl->delete_unused_cgroup_path(used_wg_id); + if (!ret.ok()) { + LOG(WARNING) << "[topic_publish_wg]" << ret.to_string(); + } } } } int64_t time_cost_ms = MonotonicMillis() - begin_time; - LOG(INFO) << "finish clear unused workload group, time cost: " << time_cost_ms - << "ms, deleted group size:" << deleted_task_groups.size(); + LOG(INFO) << "[topic_publish_wg]finish clear unused workload group, time cost: " << time_cost_ms + << "ms, deleted group size:" << deleted_task_groups.size() + << ", before wg size=" << old_wg_size << ", after wg size=" << new_wg_size; } struct WorkloadGroupMemInfo { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 90d55931af5..0514f22aed3 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1066,16 +1066,6 @@ public class Env { } queryCancelWorker.start(); - - TopicPublisher wgPublisher = new WorkloadGroupPublisher(this); - topicPublisherThread.addToTopicPublisherList(wgPublisher); - WorkloadSchedPolicyPublisher wpPublisher = new WorkloadSchedPolicyPublisher(this); - topicPublisherThread.addToTopicPublisherList(wpPublisher); - topicPublisherThread.start(); - - workloadGroupMgr.startUpdateThread(); - workloadSchedPolicyMgr.start(); - workloadRuntimeStatusMgr.start(); } // wait until FE is ready. @@ -1718,6 +1708,13 @@ public class Env { binlogGcer.start(); columnIdFlusher.start(); insertOverwriteManager.start(); + + TopicPublisher wgPublisher = new WorkloadGroupPublisher(this); + topicPublisherThread.addToTopicPublisherList(wgPublisher); + WorkloadSchedPolicyPublisher wpPublisher = new WorkloadSchedPolicyPublisher(this); + topicPublisherThread.addToTopicPublisherList(wpPublisher); + topicPublisherThread.start(); + } // start threads that should run on all FE @@ -1739,6 +1736,11 @@ public class Env { } dnsCache.start(); + + workloadGroupMgr.startUpdateThread(); + workloadSchedPolicyMgr.start(); + workloadRuntimeStatusMgr.start(); + } private void transferToNonMaster(FrontendNodeType newType) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java index 9af6bd392cb..dde45e44e29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java @@ -26,6 +26,8 @@ import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPublishTopicRequest; +import org.apache.doris.thrift.TTopicInfoType; +import org.apache.doris.thrift.TopicInfo; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,6 +36,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; public class TopicPublisherThread extends MasterDaemon { @@ -59,7 +62,7 @@ public class TopicPublisherThread extends MasterDaemon { @Override protected void runAfterCatalogReady() { - LOG.info("begin publish topic info"); + LOG.info("[topic_publish]begin publish topic info"); // step 1: get all publish topic info TPublishTopicRequest request = new TPublishTopicRequest(); for (TopicPublisher topicPublisher : topicPublisherList) { @@ -106,16 +109,24 @@ public class TopicPublisherThread extends MasterDaemon { BackendService.Client client = null; TNetworkAddress address = null; boolean ok = false; + String logStr = ""; + try { + for (Map.Entry<TTopicInfoType, List<TopicInfo>> entry : request.getTopicMap().entrySet()) { + logStr += " " + entry.getKey() + "=" + entry.getValue().size() + " "; + } + } catch (Exception e) { + LOG.warn("[topic_publish]make log detail for publish failed:", e); + } try { address = new TNetworkAddress(be.getHost(), be.getBePort()); client = ClientPool.backendPool.borrowObject(address); client.publishTopicInfo(request); ok = true; - LOG.info("publish topic info to be {} success, time cost={} ms", - be.getHost(), (System.currentTimeMillis() - beginTime)); + LOG.info("[topic_publish]publish topic info to be {} success, time cost={} ms, details:{}", + be.getHost(), (System.currentTimeMillis() - beginTime), logStr); } catch (Exception e) { - LOG.warn("publish topic info to be {} error happens: , time cost={} ms", - be.getHost(), (System.currentTimeMillis() - beginTime), e); + LOG.warn("[topic_publish]publish topic info to be {} error happens: , time cost={} ms, details:{}", + be.getHost(), (System.currentTimeMillis() - beginTime), logStr, e); } finally { try { if (ok) { @@ -124,7 +135,8 @@ public class TopicPublisherThread extends MasterDaemon { ClientPool.backendPool.invalidateObject(address, client); } } catch (Throwable e) { - LOG.warn("recycle topic publish client failed. related backend[{}]", be.getHost(), e); + LOG.warn("[topic_publish]recycle topic publish client failed. related backend[{}]", be.getHost(), + e); } handler.onResponse(be); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java index 45b3664631e..ea8ac9256e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java @@ -18,14 +18,20 @@ package org.apache.doris.common.publish; import org.apache.doris.catalog.Env; +import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; import org.apache.doris.thrift.TPublishTopicRequest; import org.apache.doris.thrift.TTopicInfoType; import org.apache.doris.thrift.TopicInfo; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.List; public class WorkloadGroupPublisher implements TopicPublisher { + private static final Logger LOG = LogManager.getLogger(WorkloadGroupPublisher.class); + private Env env; public WorkloadGroupPublisher(Env env) { @@ -35,6 +41,12 @@ public class WorkloadGroupPublisher implements TopicPublisher { @Override public void getTopicInfo(TPublishTopicRequest req) { List<TopicInfo> list = env.getWorkloadGroupMgr().getPublishTopicInfo(); - req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, list); + if (list.size() == 0) { + LOG.warn("[topic_publish]currently, doris at least has one workload group named " + + WorkloadGroupMgr.DEFAULT_GROUP_NAME + + ", so get a size 0 here is an error, should check it."); + } else { + req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, list); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org