This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ba8a9bae40 [feature-wip](executor)Fe send topic info to be (#25798)
1ba8a9bae40 is described below

commit 1ba8a9bae40922275e09832a0d2ea589565d0d02
Author: wangbo <[email protected]>
AuthorDate: Thu Oct 26 15:52:48 2023 +0800

    [feature-wip](executor)Fe send topic info to be (#25798)
---
 be/src/agent/agent_server.cpp                      |  13 +--
 be/src/agent/agent_server.h                        |   5 +-
 be/src/agent/topic_listener.h                      |  11 +-
 be/src/agent/topic_subscriber.cpp                  |  40 ++-----
 be/src/agent/topic_subscriber.h                    |  16 +--
 be/src/agent/user_resource_listener.cpp            | 105 ------------------
 be/src/agent/workload_group_listener.cpp           |  43 ++++++++
 ...source_listener.h => workload_group_listener.h} |  26 ++---
 be/src/pipeline/task_scheduler.cpp                 |   2 +-
 be/src/runtime/task_group/task_group_manager.cpp   |  50 +++++++++
 be/src/runtime/task_group/task_group_manager.h     |   2 +
 be/src/service/backend_service.h                   |   5 +
 be/src/vec/exec/scan/scanner_scheduler.h           |  11 +-
 .../main/java/org/apache/doris/common/Config.java  |   2 +
 .../main/java/org/apache/doris/catalog/Env.java    |  11 ++
 .../doris/common/publish/AckResponseHandler.java   |  13 ++-
 .../doris/common/publish/TopicPublisher.java       |  21 +---
 .../doris/common/publish/TopicPublisherThread.java | 121 +++++++++++++++++++++
 ...nseHandler.java => WorkloadGroupPublisher.java} |  29 ++---
 .../resource/workloadgroup/WorkloadGroup.java      |  12 ++
 .../resource/workloadgroup/WorkloadGroupMgr.java   |  15 +++
 .../org/apache/doris/common/GenericPoolTest.java   |   7 ++
 .../apache/doris/utframe/MockedBackendFactory.java |   7 ++
 gensrc/thrift/AgentService.thrift                  |   3 +-
 gensrc/thrift/BackendService.thrift                |  20 ++++
 25 files changed, 375 insertions(+), 215 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 46609023337..8198c38e45e 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -29,8 +29,8 @@
 
 #include "agent/task_worker_pool.h"
 #include "agent/topic_subscriber.h"
-#include "agent/user_resource_listener.h"
 #include "agent/utils.h"
+#include "agent/workload_group_listener.h"
 #include "common/logging.h"
 #include "common/status.h"
 #include "gutil/strings/substitute.h"
@@ -39,10 +39,6 @@
 #include "olap/snapshot_manager.h"
 #include "runtime/exec_env.h"
 
-namespace doris {
-class TopicListener;
-} // namespace doris
-
 using std::string;
 using std::vector;
 
@@ -134,9 +130,10 @@ AgentServer::AgentServer(ExecEnv* exec_env, const 
TMasterInfo& master_info)
 
 #if !defined(BE_TEST) && !defined(__APPLE__)
     // Add subscriber here and register listeners
-    TopicListener* user_resource_listener = new UserResourceListener(exec_env, 
master_info);
-    LOG(INFO) << "Register user resource listener";
-    _topic_subscriber->register_listener(doris::TTopicType::type::RESOURCE, 
user_resource_listener);
+    std::unique_ptr<TopicListener> wg_listener = 
std::make_unique<WorkloadGroupListener>(exec_env);
+    LOG(INFO) << "Register workload group listener";
+    
_topic_subscriber->register_listener(doris::TTopicInfoType::type::WORKLOAD_GROUP,
+                                         std::move(wg_listener));
 #endif
 }
 
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index daa1823b07e..a3e5cbd745e 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -23,10 +23,11 @@
 #include <string>
 #include <vector>
 
+#include "agent/topic_subscriber.h"
+
 namespace doris {
 
 class TaskWorkerPool;
-class TopicSubscriber;
 class ExecEnv;
 class TAgentPublishRequest;
 class TAgentResult;
@@ -52,6 +53,8 @@ public:
     // [[deprecated]]
     void publish_cluster_state(TAgentResult& agent_result, const 
TAgentPublishRequest& request);
 
+    TopicSubscriber* get_topic_subscriber() { return _topic_subscriber.get(); }
+
 private:
     DISALLOW_COPY_AND_ASSIGN(AgentServer);
 
diff --git a/be/src/agent/topic_listener.h b/be/src/agent/topic_listener.h
index 0ef9c597f19..af99a78545b 100644
--- a/be/src/agent/topic_listener.h
+++ b/be/src/agent/topic_listener.h
@@ -17,19 +17,14 @@
 
 #pragma once
 
-#include <gen_cpp/AgentService_types.h>
+#include <gen_cpp/BackendService_types.h>
 
 namespace doris {
 
 class TopicListener {
 public:
     virtual ~TopicListener() {}
-    // Deal with a single update
-    //
-    // Input parameters:
-    //   protocol version: the version for the protocol, listeners should deal 
with the msg according to the protocol
-    //   topic_update: single update
-    virtual void handle_update(const TAgentServiceVersion::type& 
protocol_version,
-                               const TTopicUpdate& topic_update) = 0;
+
+    virtual void handle_topic_info(const TPublishTopicRequest& topic_request) 
= 0;
 };
 } // namespace doris
diff --git a/be/src/agent/topic_subscriber.cpp 
b/be/src/agent/topic_subscriber.cpp
index c5c4a324ebe..c3bcc29c623 100644
--- a/be/src/agent/topic_subscriber.cpp
+++ b/be/src/agent/topic_subscriber.cpp
@@ -17,7 +17,7 @@
 
 #include "agent/topic_subscriber.h"
 
-#include <gen_cpp/AgentService_types.h>
+#include <pthread.h>
 
 #include <mutex>
 #include <utility>
@@ -28,38 +28,22 @@ namespace doris {
 
 TopicSubscriber::TopicSubscriber() {}
 
-TopicSubscriber::~TopicSubscriber() {
-    // Delete all listeners in the register
-    std::map<TTopicType::type, std::vector<TopicListener*>>::iterator it =
-            _registered_listeners.begin();
-    for (; it != _registered_listeners.end(); ++it) {
-        std::vector<TopicListener*>& listeners = it->second;
-        std::vector<TopicListener*>::iterator listener_it = listeners.begin();
-        for (; listener_it != listeners.end(); ++listener_it) {
-            delete *listener_it;
-        }
-    }
-}
-
-void TopicSubscriber::register_listener(TTopicType::type topic_type, 
TopicListener* listener) {
+void TopicSubscriber::register_listener(TTopicInfoType::type topic_type,
+                                        std::unique_ptr<TopicListener> 
topic_listener) {
     // Unique lock here to prevent access to listeners
     std::lock_guard<std::shared_mutex> lock(_listener_mtx);
-    this->_registered_listeners[topic_type].push_back(listener);
+    this->_registered_listeners.emplace(topic_type, std::move(topic_listener));
 }
 
-void TopicSubscriber::handle_updates(const TAgentPublishRequest& 
agent_publish_request) {
-    // Shared lock here in order to avoid updates in listeners' map
+void TopicSubscriber::handle_topic_info(const TPublishTopicRequest& 
topic_request) {
+    // NOTE(wb): if we found there is bottleneck for handle_topic_info by 
LOG(INFO)
+    // eg, update workload info may delay other listener, then we need add a 
thread here
+    // to handle_topic_info asynchronous
     std::shared_lock lock(_listener_mtx);
-    // Currently, not deal with protocol version, the listener should deal 
with protocol version
-    const std::vector<TTopicUpdate>& topic_updates = 
agent_publish_request.updates;
-    std::vector<TTopicUpdate>::const_iterator topic_update_it = 
topic_updates.begin();
-    for (; topic_update_it != topic_updates.end(); ++topic_update_it) {
-        std::vector<TopicListener*>& listeners = 
this->_registered_listeners[topic_update_it->type];
-        std::vector<TopicListener*>::iterator listener_it = listeners.begin();
-        // Send the update to all listeners with protocol version.
-        for (; listener_it != listeners.end(); ++listener_it) {
-            
(*listener_it)->handle_update(agent_publish_request.protocol_version, 
*topic_update_it);
-        }
+    LOG(INFO) << "begin handle topic info";
+    for (auto& listener_pair : _registered_listeners) {
+        listener_pair.second->handle_topic_info(topic_request);
+        LOG(INFO) << "handle topic " << listener_pair.first << " succ";
     }
 }
 } // namespace doris
diff --git a/be/src/agent/topic_subscriber.h b/be/src/agent/topic_subscriber.h
index 490bd35d2b0..7adcd0ea372 100644
--- a/be/src/agent/topic_subscriber.h
+++ b/be/src/agent/topic_subscriber.h
@@ -17,7 +17,8 @@
 
 #pragma once
 
-#include <gen_cpp/AgentService_types.h>
+#include <gen_cpp/BackendService_types.h>
+#include <glog/logging.h>
 
 #include <map>
 #include <shared_mutex>
@@ -29,14 +30,15 @@ class TopicListener;
 class TopicSubscriber {
 public:
     TopicSubscriber();
-    ~TopicSubscriber();
-    // Put the topic type and listener to the map
-    void register_listener(TTopicType::type topic_type, TopicListener* 
listener);
-    // Handle all updates in the request
-    void handle_updates(const TAgentPublishRequest& agent_publish_request);
+    ~TopicSubscriber() = default;
+
+    void register_listener(TTopicInfoType::type topic_type,
+                           std::unique_ptr<TopicListener> topic_listener);
+
+    void handle_topic_info(const TPublishTopicRequest& topic_request);
 
 private:
-    std::map<TTopicType::type, std::vector<TopicListener*>> 
_registered_listeners;
+    std::map<TTopicInfoType::type, std::unique_ptr<TopicListener>> 
_registered_listeners;
     std::shared_mutex _listener_mtx;
 };
 } // namespace doris
diff --git a/be/src/agent/user_resource_listener.cpp 
b/be/src/agent/user_resource_listener.cpp
deleted file mode 100644
index 6a73edf6dbc..00000000000
--- a/be/src/agent/user_resource_listener.cpp
+++ /dev/null
@@ -1,105 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "agent/user_resource_listener.h"
-
-#include <gen_cpp/AgentService_types.h>
-#include <gen_cpp/FrontendService.h>
-#include <gen_cpp/HeartbeatService_types.h>
-#include <gen_cpp/MasterService_types.h>
-#include <gen_cpp/Types_types.h>
-#include <glog/logging.h>
-#include <thrift/Thrift.h>
-#include <thrift/transport/TTransportException.h>
-
-#include <future>
-#include <ostream>
-#include <string>
-#include <vector>
-
-#include "common/config.h"
-#include "common/status.h"
-#include "runtime/client_cache.h"
-#include "runtime/exec_env.h"
-
-namespace doris {
-
-using std::string;
-using apache::thrift::TException;
-using apache::thrift::transport::TTransportException;
-
-// Initialize the resource to cgroups file mapping
-// TRESOURCE_IOPS  not mapped
-
-UserResourceListener::UserResourceListener(ExecEnv* exec_env, const 
TMasterInfo& master_info)
-        : _master_info(master_info), _exec_env(exec_env) {}
-
-UserResourceListener::~UserResourceListener() {}
-
-void UserResourceListener::handle_update(const TAgentServiceVersion::type& 
protocol_version,
-                                         const TTopicUpdate& topic_update) {
-    std::vector<TTopicItem> updates = topic_update.updates;
-    if (updates.size() > 0) {
-        int64_t new_version = updates[0].int_value;
-        // Async call to update users resource method
-        auto res = std::async(std::launch::async, 
&UserResourceListener::update_users_resource,
-                              this, new_version);
-        res.get();
-    }
-}
-
-void UserResourceListener::update_users_resource(int64_t new_version) {
-    // Call fe to get latest user resource
-    Status master_status;
-    // using 500ms as default timeout value
-    FrontendServiceConnection client(_exec_env->frontend_client_cache(),
-                                     _master_info.network_address, 
config::thrift_rpc_timeout_ms,
-                                     &master_status);
-    TFetchResourceResult new_fetched_resource;
-    if (!master_status.ok()) {
-        LOG(ERROR) << "Get frontend client failed, with address:"
-                   << _master_info.network_address.hostname << ":"
-                   << _master_info.network_address.port;
-        return;
-    }
-    try {
-        try {
-            client->fetchResource(new_fetched_resource);
-        } catch (TTransportException& e) {
-            // reopen the client and set timeout to 500ms
-            master_status = client.reopen(config::thrift_rpc_timeout_ms);
-
-            if (!master_status.ok()) {
-                LOG(WARNING) << "Reopen to get frontend client failed, with 
address:"
-                             << _master_info.network_address.hostname << ":"
-                             << _master_info.network_address.port << ", 
reason=" << e.what();
-                return;
-            }
-            LOG(WARNING) << "fetchResource from frontend failed"
-                         << ", reason=" << e.what();
-            client->fetchResource(new_fetched_resource);
-        }
-    } catch (TException& e) {
-        // Already try twice, log here
-        static_cast<void>(client.reopen(config::thrift_rpc_timeout_ms));
-        LOG(WARNING) << "retry to fetchResource from  " << 
_master_info.network_address.hostname
-                     << ":" << _master_info.network_address.port << " 
failed:\n"
-                     << e.what();
-        return;
-    }
-}
-} // namespace doris
diff --git a/be/src/agent/workload_group_listener.cpp 
b/be/src/agent/workload_group_listener.cpp
new file mode 100644
index 00000000000..bf27861c284
--- /dev/null
+++ b/be/src/agent/workload_group_listener.cpp
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "agent/workload_group_listener.h"
+
+#include "runtime/task_group/task_group.h"
+#include "runtime/task_group/task_group_manager.h"
+#include "util/mem_info.h"
+#include "util/parse_util.h"
+
+namespace doris {
+
+void WorkloadGroupListener::handle_topic_info(const TPublishTopicRequest& 
topic_request) {
+    std::set<uint64_t> current_wg_ids;
+    for (const TopicInfo& topic_info : topic_request.topic_list) {
+        if (topic_info.topic_type != 
doris::TTopicInfoType::type::WORKLOAD_GROUP) {
+            continue;
+        }
+
+        int wg_id = 0;
+        auto iter2 = topic_info.info_map.find("id");
+        std::from_chars(iter2->second.c_str(), iter2->second.c_str() + 
iter2->second.size(), wg_id);
+
+        current_wg_ids.insert(wg_id);
+    }
+
+    _exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids);
+}
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/agent/user_resource_listener.h 
b/be/src/agent/workload_group_listener.h
similarity index 54%
rename from be/src/agent/user_resource_listener.h
rename to be/src/agent/workload_group_listener.h
index 6fd25bef671..d31b1c4ef65 100644
--- a/be/src/agent/user_resource_listener.h
+++ b/be/src/agent/workload_group_listener.h
@@ -17,31 +17,21 @@
 
 #pragma once
 
-#include <gen_cpp/AgentService_types.h>
-#include <stdint.h>
+#include <glog/logging.h>
 
 #include "agent/topic_listener.h"
+#include "runtime/exec_env.h"
 
 namespace doris {
 
-class ExecEnv;
-class TMasterInfo;
-
-class UserResourceListener : public TopicListener {
+class WorkloadGroupListener : public TopicListener {
 public:
-    ~UserResourceListener();
-    // Input parameters:
-    //   root_cgroups_path: root cgroups allocated by admin to doris
-    UserResourceListener(ExecEnv* exec_env, const TMasterInfo& master_info);
-    // This method should be async
-    virtual void handle_update(const TAgentServiceVersion::type& 
protocol_version,
-                               const TTopicUpdate& topic_update);
+    ~WorkloadGroupListener() {}
+    WorkloadGroupListener(ExecEnv* exec_env) : _exec_env(exec_env) {}
+
+    void handle_topic_info(const TPublishTopicRequest& topic_request) override;
 
 private:
-    const TMasterInfo& _master_info;
     ExecEnv* _exec_env;
-    // Call cgroups mgr to update user's cgroups resource share
-    // Also refresh local user resource's cache
-    void update_users_resource(int64_t new_version);
 };
-} // namespace doris
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 4ef5def6f1d..cdd934d5c7d 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -189,6 +189,7 @@ void 
BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks,
 
 TaskScheduler::~TaskScheduler() {
     stop();
+    LOG(INFO) << "Task scheduler " << _name << " shutdown";
 }
 
 Status TaskScheduler::start() {
@@ -363,7 +364,6 @@ void TaskScheduler::_try_close_task(PipelineTask* task, 
PipelineTaskState state,
 void TaskScheduler::stop() {
     if (!this->_shutdown.load()) {
         this->_shutdown.store(true);
-        _blocked_task_scheduler->shutdown();
         if (_task_queue) {
             _task_queue->close();
         }
diff --git a/be/src/runtime/task_group/task_group_manager.cpp 
b/be/src/runtime/task_group/task_group_manager.cpp
index e6ed60148e0..b3c24fa96e7 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -122,6 +122,56 @@ Status 
TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri
     return Status::OK();
 }
 
+void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> id_set) {
+    {
+        std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
+        for (auto iter = _task_groups.begin(); iter != _task_groups.end();) {
+            uint64_t tg_id = iter->first;
+            if (id_set.find(tg_id) == id_set.end()) {
+                iter = _task_groups.erase(iter);
+            } else {
+                iter++;
+            }
+        }
+    }
+
+    // stop task sche may cost some time, so it should not be locked
+    // task scheduler is stoped in task scheduler's destructor
+    std::set<std::unique_ptr<doris::pipeline::TaskScheduler>> task_sche_to_del;
+    std::set<std::unique_ptr<vectorized::SimplifiedScanScheduler>> 
scan_task_sche_to_del;
+    {
+        std::lock_guard<std::mutex> lock(_task_scheduler_lock);
+        for (auto iter = _tg_sche_map.begin(); iter != _tg_sche_map.end();) {
+            uint64_t tg_id = iter->first;
+            if (id_set.find(tg_id) == id_set.end()) {
+                task_sche_to_del.insert(std::move(_tg_sche_map[tg_id]));
+                iter = _tg_sche_map.erase(iter);
+            } else {
+                iter++;
+            }
+        }
+
+        for (auto iter = _tg_scan_sche_map.begin(); iter != 
_tg_scan_sche_map.end();) {
+            uint64_t tg_id = iter->first;
+            if (id_set.find(tg_id) == id_set.end()) {
+                
scan_task_sche_to_del.insert(std::move(_tg_scan_sche_map[tg_id]));
+                iter = _tg_scan_sche_map.erase(iter);
+            } else {
+                iter++;
+            }
+        }
+
+        for (auto iter = _cgroup_ctl_map.begin(); iter != 
_cgroup_ctl_map.end();) {
+            uint64_t tg_id = iter->first;
+            if (id_set.find(tg_id) == id_set.end()) {
+                iter = _cgroup_ctl_map.erase(iter);
+            } else {
+                iter++;
+            }
+        }
+    }
+}
+
 void TaskGroupManager::stop() {
     for (auto& task_sche : _tg_sche_map) {
         task_sche.second->stop();
diff --git a/be/src/runtime/task_group/task_group_manager.h 
b/be/src/runtime/task_group/task_group_manager.h
index e45cdeca7ea..ae501e93f3e 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -54,6 +54,8 @@ public:
     Status create_and_get_task_scheduler(uint64_t wg_id, std::string wg_name, 
int cpu_hard_limit,
                                          ExecEnv* exec_env, QueryContext* 
query_ctx_ptr);
 
+    void delete_task_group_by_ids(std::set<uint64_t> id_set);
+
     void stop();
 
 private:
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index e98dd65a8c7..8ad55e43e6e 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -90,6 +90,11 @@ public:
         _agent_server->publish_cluster_state(result, request);
     }
 
+    void publish_topic_info(TPublishTopicResult& result,
+                            const TPublishTopicRequest& topic_request) 
override {
+        
_agent_server->get_topic_subscriber()->handle_topic_info(topic_request);
+    }
+
     // DorisServer service
     void exec_plan_fragment(TExecPlanFragmentResult& return_val,
                             const TExecPlanFragmentParams& params) override;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index ad6f86c4f18..32048458ed9 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -144,8 +144,14 @@ public:
         _wg_name = wg_name;
     }
 
+    ~SimplifiedScanScheduler() {
+        stop();
+        LOG(INFO) << "Scanner sche " << _wg_name << " shutdown";
+    }
+
     void stop() {
         _is_stop.store(true);
+        _scan_task_queue->shutdown();
         _scan_thread_pool->shutdown();
         _scan_thread_pool->wait();
     }
@@ -169,8 +175,9 @@ private:
     void _work() {
         while (!_is_stop.load()) {
             SimplifiedScanTask scan_task;
-            _scan_task_queue->blocking_get(&scan_task);
-            scan_task.scan_func();
+            if (_scan_task_queue->blocking_get(&scan_task)) {
+                scan_task.scan_func();
+            };
         }
     }
 
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index c50acaa0873..c9b75cb84f5 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2268,4 +2268,6 @@ public class Config extends ConfigBase {
     })
     public static int sync_image_timeout_second = 300;
 
+    @ConfField(mutable = true, masterOnly = true)
+    public static int publish_topic_info_interval_ms = 30000; // 30s
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 8e46daaf936..9a6d13552d6 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -113,6 +113,9 @@ import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.CountingDataOutputStream;
 import org.apache.doris.common.io.Text;
+import org.apache.doris.common.publish.TopicPublisher;
+import org.apache.doris.common.publish.TopicPublisherThread;
+import org.apache.doris.common.publish.WorkloadGroupPublisher;
 import org.apache.doris.common.util.Daemon;
 import org.apache.doris.common.util.DynamicPartitionUtil;
 import org.apache.doris.common.util.HttpURLUtil;
@@ -492,6 +495,8 @@ public class Env {
 
     private HiveTransactionMgr hiveTransactionMgr;
 
+    private TopicPublisherThread topicPublisherThread;
+
     public List<TFrontendInfo> getFrontendInfos() {
         List<TFrontendInfo> res = new ArrayList<>();
 
@@ -726,6 +731,8 @@ public class Env {
         this.binlogGcer = new BinlogGcer();
         this.columnIdFlusher = new ColumnIdFlushDaemon();
         this.queryCancelWorker = new QueryCancelWorker(systemInfo);
+        this.topicPublisherThread = new TopicPublisherThread(
+                "TopicPublisher", Config.publish_topic_info_interval_ms, 
systemInfo);
     }
 
     public static void destroyCheckpoint() {
@@ -970,6 +977,10 @@ public class Env {
         }
 
         queryCancelWorker.start();
+
+        TopicPublisher wgPublisher = new WorkloadGroupPublisher(this);
+        topicPublisherThread.addToTopicPublisherList(wgPublisher);
+        topicPublisherThread.start();
     }
 
     // wait until FE is ready.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java
index b95469d6910..f9d15a1ae5a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java
@@ -30,15 +30,24 @@ public class AckResponseHandler extends ResponseHandler {
         this.listener = listener;
     }
 
+    public AckResponseHandler(Collection<Backend> nodes) {
+        super(nodes);
+        this.listener = null;
+    }
+
     @Override
     public void onResponse(Backend node) {
         super.onResponse(node);
-        listener.onResponse(node);
+        if (listener != null) {
+            listener.onResponse(node);
+        }
     }
 
     @Override
     public void onFailure(Backend node, Throwable t) {
         super.onFailure(node, t);
-        listener.onFailure(node, t);
+        if (listener != null) {
+            listener.onFailure(node, t);
+        }
     }
 }
diff --git a/be/src/agent/topic_listener.h 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisher.java
similarity index 60%
copy from be/src/agent/topic_listener.h
copy to 
fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisher.java
index 0ef9c597f19..24086cb0e7f 100644
--- a/be/src/agent/topic_listener.h
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisher.java
@@ -15,21 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#pragma once
+package org.apache.doris.common.publish;
 
-#include <gen_cpp/AgentService_types.h>
+import org.apache.doris.thrift.TPublishTopicRequest;
 
-namespace doris {
-
-class TopicListener {
-public:
-    virtual ~TopicListener() {}
-    // Deal with a single update
-    //
-    // Input parameters:
-    //   protocol version: the version for the protocol, listeners should deal 
with the msg according to the protocol
-    //   topic_update: single update
-    virtual void handle_update(const TAgentServiceVersion::type& 
protocol_version,
-                               const TTopicUpdate& topic_update) = 0;
-};
-} // namespace doris
+public interface TopicPublisher {
+    public void getTopicInfo(TPublishTopicRequest req);
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
new file mode 100644
index 00000000000..616c8a30b56
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.publish;
+
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPublishTopicRequest;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+public class TopicPublisherThread extends MasterDaemon {
+
+    private static final Logger LOG = 
LogManager.getLogger(TopicPublisherThread.class);
+
+    private SystemInfoService clusterInfoService;
+
+    private ExecutorService executor = ThreadPoolManager
+            .newDaemonFixedThreadPool(6, 256, "topic-publish-thread", true);
+
+    public TopicPublisherThread(String name, long intervalMs,
+            SystemInfoService clusterInfoService) {
+        super(name, intervalMs);
+        this.clusterInfoService = clusterInfoService;
+    }
+
+    private List<TopicPublisher> topicPublisherList = new ArrayList<>();
+
+    public void addToTopicPublisherList(TopicPublisher topicPublisher) {
+        this.topicPublisherList.add(topicPublisher);
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        if (!Config.enable_workload_group) {
+            return;
+        }
+        LOG.info("begin publish topic info");
+        // step 1: get all publish topic info
+        TPublishTopicRequest request = new TPublishTopicRequest();
+        for (TopicPublisher topicPublisher : topicPublisherList) {
+            topicPublisher.getTopicInfo(request);
+        }
+
+        // step 2: publish topic info to all be
+        Collection<Backend> nodesToPublish = 
clusterInfoService.getIdToBackend().values();
+        AckResponseHandler handler = new AckResponseHandler(nodesToPublish);
+        for (Backend be : nodesToPublish) {
+            executor.submit(new TopicPublishWorker(request, be, handler));
+        }
+        try {
+            int timeoutMs = Config.publish_topic_info_interval_ms / 3 * 2;
+            if (!handler.awaitAllInMs(timeoutMs)) {
+                Backend[] backends = handler.pendingNodes();
+                if (backends.length > 0) {
+                    LOG.warn("timed out waiting for all nodes to publish. 
(pending nodes: {})",
+                            Arrays.toString(backends));
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    public class TopicPublishWorker implements Runnable {
+        private TPublishTopicRequest request;
+        private Backend be;
+        private ResponseHandler handler;
+
+        public TopicPublishWorker(TPublishTopicRequest request, Backend node, 
ResponseHandler handler) {
+            this.request = request;
+            this.be = node;
+            this.handler = handler;
+        }
+
+        @Override
+        public void run() {
+            long beginTime = System.currentTimeMillis();
+            try {
+                TNetworkAddress addr = new TNetworkAddress(be.getHost(), 
be.getBePort());
+                BackendService.Client client = 
ClientPool.backendPool.borrowObject(addr);
+                client.publishTopicInfo(request);
+                LOG.info("publish topic info to be {} success, time cost={} 
ms",
+                        be.getHost(), (System.currentTimeMillis() - 
beginTime));
+            } catch (Exception e) {
+                LOG.warn("publish topic info to be {} error happens: , time 
cost={} ms",
+                        be.getHost(), (System.currentTimeMillis() - 
beginTime), e);
+            } finally {
+                handler.onResponse(be);
+            }
+        }
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
similarity index 59%
copy from 
fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
index b95469d6910..2330700ce7b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
@@ -17,28 +17,23 @@
 
 package org.apache.doris.common.publish;
 
-import org.apache.doris.system.Backend;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.thrift.TPublishTopicRequest;
+import org.apache.doris.thrift.TopicInfo;
 
-import java.util.Collection;
+public class WorkloadGroupPublisher implements TopicPublisher {
 
-// Response handler contain a listener
-public class AckResponseHandler extends ResponseHandler {
-    private Listener listener;
+    private Env env;
 
-    public AckResponseHandler(Collection<Backend> nodes, Listener listener) {
-        super(nodes);
-        this.listener = listener;
+    public WorkloadGroupPublisher(Env env) {
+        this.env = env;
     }
 
     @Override
-    public void onResponse(Backend node) {
-        super.onResponse(node);
-        listener.onResponse(node);
-    }
-
-    @Override
-    public void onFailure(Backend node, Throwable t) {
-        super.onFailure(node, t);
-        listener.onFailure(node, t);
+    public void getTopicInfo(TPublishTopicRequest req) {
+        for (TopicInfo topicInfo : env.getWorkloadGroupMgr()
+                .getPublishTopicInfo()) {
+            req.addToTopicList(topicInfo);
+        }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index 68115c4c842..b8aebdccab0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -25,6 +25,8 @@ import org.apache.doris.common.proc.BaseProcResult;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.thrift.TPipelineWorkloadGroup;
+import org.apache.doris.thrift.TTopicInfoType;
+import org.apache.doris.thrift.TopicInfo;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
@@ -290,6 +292,16 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
         return new 
TPipelineWorkloadGroup().setId(id).setName(name).setProperties(clonedHashMap).setVersion(version);
     }
 
+    public TopicInfo toTopicInfo() {
+        HashMap<String, String> newHashMap = new HashMap<>();
+        newHashMap.put("id", String.valueOf(id));
+        TopicInfo topicInfo = new TopicInfo();
+        topicInfo.setTopicType(TTopicInfoType.WORKLOAD_GROUP);
+        topicInfo.setInfoMap(newHashMap);
+        topicInfo.setTopicKey(name);
+        return topicInfo;
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         String json = GsonUtils.GSON.toJson(this);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 1ec47d053fe..5f0a52cab55 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -40,6 +40,7 @@ import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TPipelineWorkloadGroup;
 import org.apache.doris.thrift.TUserIdentity;
+import org.apache.doris.thrift.TopicInfo;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
@@ -52,6 +53,7 @@ import org.apache.logging.log4j.Logger;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -133,6 +135,19 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
         return workloadGroups;
     }
 
+    public List<TopicInfo> getPublishTopicInfo() {
+        List<TopicInfo> workloadGroups = new ArrayList();
+        readLock();
+        try {
+            for (WorkloadGroup wg : idToWorkloadGroup.values()) {
+                workloadGroups.add(wg.toTopicInfo());
+            }
+        } finally {
+            readUnlock();
+        }
+        return workloadGroups;
+    }
+
     public QueryQueue getWorkloadGroupQueryQueue(ConnectContext context) 
throws UserException {
         String groupName = getWorkloadGroupNameAndCheckPriv(context);
         readLock();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
index 3ab76732f61..a2f3867de2b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
@@ -32,6 +32,8 @@ import org.apache.doris.thrift.TExportTaskRequest;
 import org.apache.doris.thrift.TIngestBinlogRequest;
 import org.apache.doris.thrift.TIngestBinlogResult;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPublishTopicRequest;
+import org.apache.doris.thrift.TPublishTopicResult;
 import org.apache.doris.thrift.TRoutineLoadTask;
 import org.apache.doris.thrift.TScanBatchResult;
 import org.apache.doris.thrift.TScanCloseParams;
@@ -139,6 +141,11 @@ public class GenericPoolTest {
             return null;
         }
 
+        @Override
+        public TPublishTopicResult publishTopicInfo(TPublishTopicRequest 
request) throws TException {
+            return null;
+        }
+
         @Override
         public TAgentResult makeSnapshot(TSnapshotRequest snapshotRequest) 
throws TException {
             // TODO Auto-generated method stub
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index d3f5aeeacc2..058e356a325 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -50,6 +50,8 @@ import org.apache.doris.thrift.TIngestBinlogRequest;
 import org.apache.doris.thrift.TIngestBinlogResult;
 import org.apache.doris.thrift.TMasterInfo;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPublishTopicRequest;
+import org.apache.doris.thrift.TPublishTopicResult;
 import org.apache.doris.thrift.TRoutineLoadTask;
 import org.apache.doris.thrift.TScanBatchResult;
 import org.apache.doris.thrift.TScanCloseParams;
@@ -299,6 +301,11 @@ public class MockedBackendFactory {
             return new TAgentResult(new TStatus(TStatusCode.OK));
         }
 
+        @Override
+        public TPublishTopicResult publishTopicInfo(TPublishTopicRequest 
request) throws TException {
+            return new TPublishTopicResult(new TStatus(TStatusCode.OK));
+        }
+
         @Override
         public TStatus submitExportTask(TExportTaskRequest request) throws 
TException {
             return new TStatus(TStatusCode.OK);
diff --git a/gensrc/thrift/AgentService.thrift 
b/gensrc/thrift/AgentService.thrift
index 4b4c260b473..c161fc99e71 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -506,5 +506,4 @@ struct TTopicUpdate {
 struct TAgentPublishRequest {
     1: required TAgentServiceVersion protocol_version
     2: required list<TTopicUpdate> updates
-}
-
+}
\ No newline at end of file
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index 3d77eab4cad..a7a9c50aed2 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -138,6 +138,24 @@ struct TIngestBinlogResult {
     1: optional Status.TStatus status;
 }
 
+enum TTopicInfoType {
+    WORKLOAD_GROUP
+}
+
+struct TopicInfo {
+    1: optional string topic_key
+    2: required TTopicInfoType topic_type
+    3: optional map<string, string> info_map
+}
+
+struct TPublishTopicRequest {
+    1: required list<TopicInfo> topic_list
+}
+
+struct TPublishTopicResult {
+    1: required Status.TStatus status
+}
+
 service BackendService {
     // Called by coord to start asynchronous execution of plan fragment in 
backend.
     // Returns as soon as all incoming data streams have been set up.
@@ -193,4 +211,6 @@ service BackendService {
     TCheckStorageFormatResult check_storage_format();
 
     TIngestBinlogResult ingest_binlog(1: TIngestBinlogRequest 
ingest_binlog_request);
+
+    TPublishTopicResult publish_topic_info(1:TPublishTopicRequest 
topic_request);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to