This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 550ed2b6e75 [Fix](executor)Fix when Fe send empty wg list to be may 
cause query failed. (#34074)
550ed2b6e75 is described below

commit 550ed2b6e75a6c4b698f5f6b1fa9027a218e2cdb
Author: wangbo <wan...@apache.org>
AuthorDate: Wed Apr 24 23:36:17 2024 +0800

    [Fix](executor)Fix when Fe send empty wg list to be may cause query failed. 
(#34074)
---
 be/src/agent/topic_subscriber.cpp                  |  6 +-
 be/src/agent/workload_group_listener.cpp           | 27 +++++++--
 be/src/runtime/workload_group/workload_group.cpp   |  4 +-
 be/src/runtime/workload_group/workload_group.h     |  7 ++-
 .../workload_group/workload_group_manager.cpp      | 67 ++++++++++++----------
 .../main/java/org/apache/doris/catalog/Env.java    | 22 +++----
 .../doris/common/publish/TopicPublisherThread.java | 24 ++++++--
 .../common/publish/WorkloadGroupPublisher.java     | 14 ++++-
 8 files changed, 114 insertions(+), 57 deletions(-)

diff --git a/be/src/agent/topic_subscriber.cpp 
b/be/src/agent/topic_subscriber.cpp
index 7f7cffd8840..f62bdaef099 100644
--- a/be/src/agent/topic_subscriber.cpp
+++ b/be/src/agent/topic_subscriber.cpp
@@ -40,12 +40,14 @@ void TopicSubscriber::handle_topic_info(const 
TPublishTopicRequest& topic_reques
     // eg, update workload info may delay other listener, then we need add a 
thread here
     // to handle_topic_info asynchronous
     std::shared_lock lock(_listener_mtx);
-    LOG(INFO) << "begin handle topic info";
+    LOG(INFO) << "[topic_publish]begin handle topic info";
     for (auto& listener_pair : _registered_listeners) {
         if (topic_request.topic_map.find(listener_pair.first) != 
topic_request.topic_map.end()) {
+            LOG(INFO) << "[topic_publish]begin handle topic " << 
listener_pair.first
+                      << ", size=" << 
topic_request.topic_map.at(listener_pair.first).size();
             listener_pair.second->handle_topic_info(
                     topic_request.topic_map.at(listener_pair.first));
-            LOG(INFO) << "handle topic " << listener_pair.first << " 
successfully";
+            LOG(INFO) << "[topic_publish]finish handle topic " << 
listener_pair.first;
         }
     }
 }
diff --git a/be/src/agent/workload_group_listener.cpp 
b/be/src/agent/workload_group_listener.cpp
index f98315fa433..822e3c692f7 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -26,21 +26,27 @@ namespace doris {
 
 void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& 
topic_info_list) {
     std::set<uint64_t> current_wg_ids;
+    bool is_set_workload_group_info = false;
+    int list_size = topic_info_list.size();
     for (const TopicInfo& topic_info : topic_info_list) {
         if (!topic_info.__isset.workload_group_info) {
             continue;
         }
+        is_set_workload_group_info = true;
 
         // 1 parse topicinfo to group info
         WorkloadGroupInfo workload_group_info;
         Status ret = 
WorkloadGroupInfo::parse_topic_info(topic_info.workload_group_info,
                                                          &workload_group_info);
+        // it means FE has this wg, but may parse failed, so we should not 
delete it.
+        if (workload_group_info.id != 0) {
+            current_wg_ids.insert(workload_group_info.id);
+        }
         if (!ret.ok()) {
-            LOG(INFO) << "parse topic info failed, tg_id=" << 
workload_group_info.id
-                      << ", reason:" << ret.to_string();
+            LOG(INFO) << "[topic_publish_wg]parse topic info failed, tg_id="
+                      << workload_group_info.id << ", reason:" << 
ret.to_string();
             continue;
         }
-        current_wg_ids.insert(workload_group_info.id);
 
         // 2 update workload group
         auto tg =
@@ -53,16 +59,25 @@ void WorkloadGroupListener::handle_topic_info(const 
std::vector<TopicInfo>& topi
         // 4 create and update task scheduler
         tg->upsert_task_scheduler(&workload_group_info, _exec_env);
 
-        LOG(INFO) << "update workload group finish, tg info=" << 
tg->debug_string()
-                  << ", enable_cpu_hard_limit="
+        LOG(INFO) << "[topic_publish_wg]update workload group finish, tg info="
+                  << tg->debug_string() << ", enable_cpu_hard_limit="
                   << (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() 
? "true" : "false")
                   << ", cgroup cpu_shares=" << 
workload_group_info.cgroup_cpu_shares
                   << ", cgroup cpu_hard_limit=" << 
workload_group_info.cgroup_cpu_hard_limit
                   << ", enable_cgroup_cpu_soft_limit="
                   << (config::enable_cgroup_cpu_soft_limit ? "true" : "false")
-                  << ", cgroup home path=" << config::doris_cgroup_cpu_path;
+                  << ", cgroup home path=" << config::doris_cgroup_cpu_path
+                  << ", list size=" << list_size;
     }
 
+    // NOTE(wb) when is_set_workload_group_info=false, it means FE send a 
empty workload group list
+    // this should not happens, because FE should has at least one normal 
group.
+    // just log it if that happens
+    if (!is_set_workload_group_info) {
+        LOG(INFO) << "[topic_publish_wg]unexpected error happens, no workload 
group info is "
+                     "set, list size="
+                  << list_size;
+    }
     
_exec_env->workload_group_mgr()->delete_workload_group_by_ids(current_wg_ids);
 }
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 1b15e89b08e..673263f1a17 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -68,11 +68,11 @@ std::string WorkloadGroup::debug_string() const {
             "TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, 
enable_memory_overcommit = "
             "{}, version = {}, cpu_hard_limit = {}, scan_thread_num = "
             "{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = 
{}, "
-            "spill_low_watermark={}, spill_high_watermark={}]",
+            "spill_low_watermark={}, spill_high_watermark={}, is_shutdown={}, 
query_num={}]",
             _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, 
TUnit::BYTES),
             _enable_memory_overcommit ? "true" : "false", _version, 
cpu_hard_limit(),
             _scan_thread_num, _max_remote_scan_thread_num, 
_min_remote_scan_thread_num,
-            _spill_low_watermark, _spill_high_watermark);
+            _spill_low_watermark, _spill_high_watermark, _is_shutdown, 
_query_ctxs.size());
 }
 
 void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 49bcd841a0f..d4ef689766a 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -119,7 +119,7 @@ public:
             // If the workload group is set shutdown, then should not run any 
more,
             // because the scheduler pool and other pointer may be released.
             return Status::InternalError(
-                    "Failed add query to workload group, the workload group is 
shutdown. host: {}",
+                    "Failed add query to wg {}, the workload group is 
shutdown. host: {}", _id,
                     BackendOptions::get_localhost());
         }
         _query_ctxs.insert({query_id, query_ctx});
@@ -136,6 +136,11 @@ public:
         _is_shutdown = true;
     }
 
+    bool can_be_dropped() {
+        std::shared_lock<std::shared_mutex> r_lock(_mutex);
+        return _is_shutdown && _query_ctxs.size() == 0;
+    }
+
     int query_num() {
         std::shared_lock<std::shared_mutex> r_lock(_mutex);
         return _query_ctxs.size();
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index e336c9f80a8..a0e0de75f36 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -76,35 +76,40 @@ void 
WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
     int64_t begin_time = MonotonicMillis();
     // 1 get delete group without running queries
     std::vector<WorkloadGroupPtr> deleted_task_groups;
+    int old_wg_size = 0;
+    int new_wg_size = 0;
     {
         std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
+        old_wg_size = _workload_groups.size();
         for (auto iter = _workload_groups.begin(); iter != 
_workload_groups.end(); iter++) {
-            uint64_t tg_id = iter->first;
+            uint64_t wg_id = iter->first;
             auto workload_group_ptr = iter->second;
-            if (used_wg_id.find(tg_id) == used_wg_id.end()) {
+            if (used_wg_id.find(wg_id) == used_wg_id.end()) {
                 workload_group_ptr->shutdown();
-                // only when no query running in workload group, its resource 
can be released in BE
-                if (workload_group_ptr->query_num() == 0) {
-                    LOG(INFO) << "There is no query in wg " << tg_id << ", 
delete it.";
-                    deleted_task_groups.push_back(workload_group_ptr);
-                }
+                LOG(INFO) << "[topic_publish_wg] shutdown wg:" << wg_id;
+            }
+            // wg is shutdown and running rum = 0, its resource can be 
released in BE
+            if (workload_group_ptr->can_be_dropped()) {
+                LOG(INFO) << "[topic_publish_wg]There is no query in wg" << 
wg_id << ", delete it.";
+                deleted_task_groups.push_back(workload_group_ptr);
             }
         }
     }
 
     // 2 stop active thread
-    for (auto& tg : deleted_task_groups) {
+    for (auto& wg : deleted_task_groups) {
         // There is not lock here, but the tg may be released by another
-        // thread, so that we should use shared ptr here, not use tg_id
-        tg->try_stop_schedulers();
+        // thread, so that we should use shared ptr here, not use wg_id
+        wg->try_stop_schedulers();
     }
 
     // 3 release resource in memory
     {
         std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
-        for (auto& tg : deleted_task_groups) {
-            _workload_groups.erase(tg->id());
+        for (auto& wg : deleted_task_groups) {
+            _workload_groups.erase(wg->id());
         }
+        new_wg_size = _workload_groups.size();
     }
 
     // 4 clear cgroup dir
@@ -113,28 +118,32 @@ void 
WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
     // So the first time to rmdir a cgroup path may failed.
     // Using cgdelete has no such issue.
     {
-        std::lock_guard<std::shared_mutex> write_lock(_init_cg_ctl_lock);
-        if (!_cg_cpu_ctl) {
-            _cg_cpu_ctl = std::make_unique<CgroupV1CpuCtl>();
-        }
-        if (!_is_init_succ) {
-            Status ret = _cg_cpu_ctl->init();
-            if (ret.ok()) {
-                _is_init_succ = true;
-            } else {
-                LOG(INFO) << "init workload group mgr cpu ctl failed, " << 
ret.to_string();
+        if (config::doris_cgroup_cpu_path != "") {
+            std::lock_guard<std::shared_mutex> write_lock(_init_cg_ctl_lock);
+            if (!_cg_cpu_ctl) {
+                _cg_cpu_ctl = std::make_unique<CgroupV1CpuCtl>();
             }
-        }
-        if (_is_init_succ) {
-            Status ret = _cg_cpu_ctl->delete_unused_cgroup_path(used_wg_id);
-            if (!ret.ok()) {
-                LOG(WARNING) << ret.to_string();
+            if (!_is_init_succ) {
+                Status ret = _cg_cpu_ctl->init();
+                if (ret.ok()) {
+                    _is_init_succ = true;
+                } else {
+                    LOG(INFO) << "[topic_publish_wg]init workload group mgr 
cpu ctl failed, "
+                              << ret.to_string();
+                }
+            }
+            if (_is_init_succ) {
+                Status ret = 
_cg_cpu_ctl->delete_unused_cgroup_path(used_wg_id);
+                if (!ret.ok()) {
+                    LOG(WARNING) << "[topic_publish_wg]" << ret.to_string();
+                }
             }
         }
     }
     int64_t time_cost_ms = MonotonicMillis() - begin_time;
-    LOG(INFO) << "finish clear unused workload group, time cost: " << 
time_cost_ms
-              << "ms, deleted group size:" << deleted_task_groups.size();
+    LOG(INFO) << "[topic_publish_wg]finish clear unused workload group, time 
cost: " << time_cost_ms
+              << "ms, deleted group size:" << deleted_task_groups.size()
+              << ", before wg size=" << old_wg_size << ", after wg size=" << 
new_wg_size;
 }
 
 struct WorkloadGroupMemInfo {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 90d55931af5..0514f22aed3 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -1066,16 +1066,6 @@ public class Env {
         }
 
         queryCancelWorker.start();
-
-        TopicPublisher wgPublisher = new WorkloadGroupPublisher(this);
-        topicPublisherThread.addToTopicPublisherList(wgPublisher);
-        WorkloadSchedPolicyPublisher wpPublisher = new 
WorkloadSchedPolicyPublisher(this);
-        topicPublisherThread.addToTopicPublisherList(wpPublisher);
-        topicPublisherThread.start();
-
-        workloadGroupMgr.startUpdateThread();
-        workloadSchedPolicyMgr.start();
-        workloadRuntimeStatusMgr.start();
     }
 
     // wait until FE is ready.
@@ -1718,6 +1708,13 @@ public class Env {
         binlogGcer.start();
         columnIdFlusher.start();
         insertOverwriteManager.start();
+
+        TopicPublisher wgPublisher = new WorkloadGroupPublisher(this);
+        topicPublisherThread.addToTopicPublisherList(wgPublisher);
+        WorkloadSchedPolicyPublisher wpPublisher = new 
WorkloadSchedPolicyPublisher(this);
+        topicPublisherThread.addToTopicPublisherList(wpPublisher);
+        topicPublisherThread.start();
+
     }
 
     // start threads that should run on all FE
@@ -1739,6 +1736,11 @@ public class Env {
         }
 
         dnsCache.start();
+
+        workloadGroupMgr.startUpdateThread();
+        workloadSchedPolicyMgr.start();
+        workloadRuntimeStatusMgr.start();
+
     }
 
     private void transferToNonMaster(FrontendNodeType newType) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
index 9af6bd392cb..dde45e44e29 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
@@ -26,6 +26,8 @@ import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.BackendService;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPublishTopicRequest;
+import org.apache.doris.thrift.TTopicInfoType;
+import org.apache.doris.thrift.TopicInfo;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -34,6 +36,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 public class TopicPublisherThread extends MasterDaemon {
@@ -59,7 +62,7 @@ public class TopicPublisherThread extends MasterDaemon {
 
     @Override
     protected void runAfterCatalogReady() {
-        LOG.info("begin publish topic info");
+        LOG.info("[topic_publish]begin publish topic info");
         // step 1: get all publish topic info
         TPublishTopicRequest request = new TPublishTopicRequest();
         for (TopicPublisher topicPublisher : topicPublisherList) {
@@ -106,16 +109,24 @@ public class TopicPublisherThread extends MasterDaemon {
             BackendService.Client client = null;
             TNetworkAddress address = null;
             boolean ok = false;
+            String logStr = "";
+            try {
+                for (Map.Entry<TTopicInfoType, List<TopicInfo>> entry : 
request.getTopicMap().entrySet()) {
+                    logStr += " " + entry.getKey() + "=" + 
entry.getValue().size() + " ";
+                }
+            } catch (Exception e) {
+                LOG.warn("[topic_publish]make log detail for publish failed:", 
e);
+            }
             try {
                 address = new TNetworkAddress(be.getHost(), be.getBePort());
                 client = ClientPool.backendPool.borrowObject(address);
                 client.publishTopicInfo(request);
                 ok = true;
-                LOG.info("publish topic info to be {} success, time cost={} 
ms",
-                        be.getHost(), (System.currentTimeMillis() - 
beginTime));
+                LOG.info("[topic_publish]publish topic info to be {} success, 
time cost={} ms, details:{}",
+                        be.getHost(), (System.currentTimeMillis() - 
beginTime), logStr);
             } catch (Exception e) {
-                LOG.warn("publish topic info to be {} error happens: , time 
cost={} ms",
-                        be.getHost(), (System.currentTimeMillis() - 
beginTime), e);
+                LOG.warn("[topic_publish]publish topic info to be {} error 
happens: , time cost={} ms, details:{}",
+                        be.getHost(), (System.currentTimeMillis() - 
beginTime), logStr, e);
             } finally {
                 try {
                     if (ok) {
@@ -124,7 +135,8 @@ public class TopicPublisherThread extends MasterDaemon {
                         ClientPool.backendPool.invalidateObject(address, 
client);
                     }
                 } catch (Throwable e) {
-                    LOG.warn("recycle topic publish client failed. related 
backend[{}]", be.getHost(), e);
+                    LOG.warn("[topic_publish]recycle topic publish client 
failed. related backend[{}]", be.getHost(),
+                            e);
                 }
                 handler.onResponse(be);
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
index 45b3664631e..ea8ac9256e4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
@@ -18,14 +18,20 @@
 package org.apache.doris.common.publish;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
 import org.apache.doris.thrift.TPublishTopicRequest;
 import org.apache.doris.thrift.TTopicInfoType;
 import org.apache.doris.thrift.TopicInfo;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.List;
 
 public class WorkloadGroupPublisher implements TopicPublisher {
 
+    private static final Logger LOG = 
LogManager.getLogger(WorkloadGroupPublisher.class);
+
     private Env env;
 
     public WorkloadGroupPublisher(Env env) {
@@ -35,6 +41,12 @@ public class WorkloadGroupPublisher implements 
TopicPublisher {
     @Override
     public void getTopicInfo(TPublishTopicRequest req) {
         List<TopicInfo> list = env.getWorkloadGroupMgr().getPublishTopicInfo();
-        req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, list);
+        if (list.size() == 0) {
+            LOG.warn("[topic_publish]currently, doris at least has one 
workload group named "
+                    + WorkloadGroupMgr.DEFAULT_GROUP_NAME
+                    + ", so get a size 0 here is an error, should check it.");
+        } else {
+            req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, list);
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to