This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1ba8a9bae40 [feature-wip](executor)Fe send topic info to be (#25798)
1ba8a9bae40 is described below
commit 1ba8a9bae40922275e09832a0d2ea589565d0d02
Author: wangbo <[email protected]>
AuthorDate: Thu Oct 26 15:52:48 2023 +0800
[feature-wip](executor)Fe send topic info to be (#25798)
---
be/src/agent/agent_server.cpp | 13 +--
be/src/agent/agent_server.h | 5 +-
be/src/agent/topic_listener.h | 11 +-
be/src/agent/topic_subscriber.cpp | 40 ++-----
be/src/agent/topic_subscriber.h | 16 +--
be/src/agent/user_resource_listener.cpp | 105 ------------------
be/src/agent/workload_group_listener.cpp | 43 ++++++++
...source_listener.h => workload_group_listener.h} | 26 ++---
be/src/pipeline/task_scheduler.cpp | 2 +-
be/src/runtime/task_group/task_group_manager.cpp | 50 +++++++++
be/src/runtime/task_group/task_group_manager.h | 2 +
be/src/service/backend_service.h | 5 +
be/src/vec/exec/scan/scanner_scheduler.h | 11 +-
.../main/java/org/apache/doris/common/Config.java | 2 +
.../main/java/org/apache/doris/catalog/Env.java | 11 ++
.../doris/common/publish/AckResponseHandler.java | 13 ++-
.../doris/common/publish/TopicPublisher.java | 21 +---
.../doris/common/publish/TopicPublisherThread.java | 121 +++++++++++++++++++++
...nseHandler.java => WorkloadGroupPublisher.java} | 29 ++---
.../resource/workloadgroup/WorkloadGroup.java | 12 ++
.../resource/workloadgroup/WorkloadGroupMgr.java | 15 +++
.../org/apache/doris/common/GenericPoolTest.java | 7 ++
.../apache/doris/utframe/MockedBackendFactory.java | 7 ++
gensrc/thrift/AgentService.thrift | 3 +-
gensrc/thrift/BackendService.thrift | 20 ++++
25 files changed, 375 insertions(+), 215 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 46609023337..8198c38e45e 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -29,8 +29,8 @@
#include "agent/task_worker_pool.h"
#include "agent/topic_subscriber.h"
-#include "agent/user_resource_listener.h"
#include "agent/utils.h"
+#include "agent/workload_group_listener.h"
#include "common/logging.h"
#include "common/status.h"
#include "gutil/strings/substitute.h"
@@ -39,10 +39,6 @@
#include "olap/snapshot_manager.h"
#include "runtime/exec_env.h"
-namespace doris {
-class TopicListener;
-} // namespace doris
-
using std::string;
using std::vector;
@@ -134,9 +130,10 @@ AgentServer::AgentServer(ExecEnv* exec_env, const
TMasterInfo& master_info)
#if !defined(BE_TEST) && !defined(__APPLE__)
// Add subscriber here and register listeners
- TopicListener* user_resource_listener = new UserResourceListener(exec_env,
master_info);
- LOG(INFO) << "Register user resource listener";
- _topic_subscriber->register_listener(doris::TTopicType::type::RESOURCE,
user_resource_listener);
+ std::unique_ptr<TopicListener> wg_listener =
std::make_unique<WorkloadGroupListener>(exec_env);
+ LOG(INFO) << "Register workload group listener";
+
_topic_subscriber->register_listener(doris::TTopicInfoType::type::WORKLOAD_GROUP,
+ std::move(wg_listener));
#endif
}
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index daa1823b07e..a3e5cbd745e 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -23,10 +23,11 @@
#include <string>
#include <vector>
+#include "agent/topic_subscriber.h"
+
namespace doris {
class TaskWorkerPool;
-class TopicSubscriber;
class ExecEnv;
class TAgentPublishRequest;
class TAgentResult;
@@ -52,6 +53,8 @@ public:
// [[deprecated]]
void publish_cluster_state(TAgentResult& agent_result, const
TAgentPublishRequest& request);
+ TopicSubscriber* get_topic_subscriber() { return _topic_subscriber.get(); }
+
private:
DISALLOW_COPY_AND_ASSIGN(AgentServer);
diff --git a/be/src/agent/topic_listener.h b/be/src/agent/topic_listener.h
index 0ef9c597f19..af99a78545b 100644
--- a/be/src/agent/topic_listener.h
+++ b/be/src/agent/topic_listener.h
@@ -17,19 +17,14 @@
#pragma once
-#include <gen_cpp/AgentService_types.h>
+#include <gen_cpp/BackendService_types.h>
namespace doris {
class TopicListener {
public:
virtual ~TopicListener() {}
- // Deal with a single update
- //
- // Input parameters:
- // protocol version: the version for the protocol, listeners should deal
with the msg according to the protocol
- // topic_update: single update
- virtual void handle_update(const TAgentServiceVersion::type&
protocol_version,
- const TTopicUpdate& topic_update) = 0;
+
+ virtual void handle_topic_info(const TPublishTopicRequest& topic_request)
= 0;
};
} // namespace doris
diff --git a/be/src/agent/topic_subscriber.cpp
b/be/src/agent/topic_subscriber.cpp
index c5c4a324ebe..c3bcc29c623 100644
--- a/be/src/agent/topic_subscriber.cpp
+++ b/be/src/agent/topic_subscriber.cpp
@@ -17,7 +17,7 @@
#include "agent/topic_subscriber.h"
-#include <gen_cpp/AgentService_types.h>
+#include <pthread.h>
#include <mutex>
#include <utility>
@@ -28,38 +28,22 @@ namespace doris {
TopicSubscriber::TopicSubscriber() {}
-TopicSubscriber::~TopicSubscriber() {
- // Delete all listeners in the register
- std::map<TTopicType::type, std::vector<TopicListener*>>::iterator it =
- _registered_listeners.begin();
- for (; it != _registered_listeners.end(); ++it) {
- std::vector<TopicListener*>& listeners = it->second;
- std::vector<TopicListener*>::iterator listener_it = listeners.begin();
- for (; listener_it != listeners.end(); ++listener_it) {
- delete *listener_it;
- }
- }
-}
-
-void TopicSubscriber::register_listener(TTopicType::type topic_type,
TopicListener* listener) {
+void TopicSubscriber::register_listener(TTopicInfoType::type topic_type,
+ std::unique_ptr<TopicListener>
topic_listener) {
// Unique lock here to prevent access to listeners
std::lock_guard<std::shared_mutex> lock(_listener_mtx);
- this->_registered_listeners[topic_type].push_back(listener);
+ this->_registered_listeners.emplace(topic_type, std::move(topic_listener));
}
-void TopicSubscriber::handle_updates(const TAgentPublishRequest&
agent_publish_request) {
- // Shared lock here in order to avoid updates in listeners' map
+void TopicSubscriber::handle_topic_info(const TPublishTopicRequest&
topic_request) {
+ // NOTE(wb): if we found there is bottleneck for handle_topic_info by
LOG(INFO)
+ // eg, update workload info may delay other listener, then we need add a
thread here
+ // to handle_topic_info asynchronous
std::shared_lock lock(_listener_mtx);
- // Currently, not deal with protocol version, the listener should deal
with protocol version
- const std::vector<TTopicUpdate>& topic_updates =
agent_publish_request.updates;
- std::vector<TTopicUpdate>::const_iterator topic_update_it =
topic_updates.begin();
- for (; topic_update_it != topic_updates.end(); ++topic_update_it) {
- std::vector<TopicListener*>& listeners =
this->_registered_listeners[topic_update_it->type];
- std::vector<TopicListener*>::iterator listener_it = listeners.begin();
- // Send the update to all listeners with protocol version.
- for (; listener_it != listeners.end(); ++listener_it) {
-
(*listener_it)->handle_update(agent_publish_request.protocol_version,
*topic_update_it);
- }
+ LOG(INFO) << "begin handle topic info";
+ for (auto& listener_pair : _registered_listeners) {
+ listener_pair.second->handle_topic_info(topic_request);
+ LOG(INFO) << "handle topic " << listener_pair.first << " succ";
}
}
} // namespace doris
diff --git a/be/src/agent/topic_subscriber.h b/be/src/agent/topic_subscriber.h
index 490bd35d2b0..7adcd0ea372 100644
--- a/be/src/agent/topic_subscriber.h
+++ b/be/src/agent/topic_subscriber.h
@@ -17,7 +17,8 @@
#pragma once
-#include <gen_cpp/AgentService_types.h>
+#include <gen_cpp/BackendService_types.h>
+#include <glog/logging.h>
#include <map>
#include <shared_mutex>
@@ -29,14 +30,15 @@ class TopicListener;
class TopicSubscriber {
public:
TopicSubscriber();
- ~TopicSubscriber();
- // Put the topic type and listener to the map
- void register_listener(TTopicType::type topic_type, TopicListener*
listener);
- // Handle all updates in the request
- void handle_updates(const TAgentPublishRequest& agent_publish_request);
+ ~TopicSubscriber() = default;
+
+ void register_listener(TTopicInfoType::type topic_type,
+ std::unique_ptr<TopicListener> topic_listener);
+
+ void handle_topic_info(const TPublishTopicRequest& topic_request);
private:
- std::map<TTopicType::type, std::vector<TopicListener*>>
_registered_listeners;
+ std::map<TTopicInfoType::type, std::unique_ptr<TopicListener>>
_registered_listeners;
std::shared_mutex _listener_mtx;
};
} // namespace doris
diff --git a/be/src/agent/user_resource_listener.cpp
b/be/src/agent/user_resource_listener.cpp
deleted file mode 100644
index 6a73edf6dbc..00000000000
--- a/be/src/agent/user_resource_listener.cpp
+++ /dev/null
@@ -1,105 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "agent/user_resource_listener.h"
-
-#include <gen_cpp/AgentService_types.h>
-#include <gen_cpp/FrontendService.h>
-#include <gen_cpp/HeartbeatService_types.h>
-#include <gen_cpp/MasterService_types.h>
-#include <gen_cpp/Types_types.h>
-#include <glog/logging.h>
-#include <thrift/Thrift.h>
-#include <thrift/transport/TTransportException.h>
-
-#include <future>
-#include <ostream>
-#include <string>
-#include <vector>
-
-#include "common/config.h"
-#include "common/status.h"
-#include "runtime/client_cache.h"
-#include "runtime/exec_env.h"
-
-namespace doris {
-
-using std::string;
-using apache::thrift::TException;
-using apache::thrift::transport::TTransportException;
-
-// Initialize the resource to cgroups file mapping
-// TRESOURCE_IOPS not mapped
-
-UserResourceListener::UserResourceListener(ExecEnv* exec_env, const
TMasterInfo& master_info)
- : _master_info(master_info), _exec_env(exec_env) {}
-
-UserResourceListener::~UserResourceListener() {}
-
-void UserResourceListener::handle_update(const TAgentServiceVersion::type&
protocol_version,
- const TTopicUpdate& topic_update) {
- std::vector<TTopicItem> updates = topic_update.updates;
- if (updates.size() > 0) {
- int64_t new_version = updates[0].int_value;
- // Async call to update users resource method
- auto res = std::async(std::launch::async,
&UserResourceListener::update_users_resource,
- this, new_version);
- res.get();
- }
-}
-
-void UserResourceListener::update_users_resource(int64_t new_version) {
- // Call fe to get latest user resource
- Status master_status;
- // using 500ms as default timeout value
- FrontendServiceConnection client(_exec_env->frontend_client_cache(),
- _master_info.network_address,
config::thrift_rpc_timeout_ms,
- &master_status);
- TFetchResourceResult new_fetched_resource;
- if (!master_status.ok()) {
- LOG(ERROR) << "Get frontend client failed, with address:"
- << _master_info.network_address.hostname << ":"
- << _master_info.network_address.port;
- return;
- }
- try {
- try {
- client->fetchResource(new_fetched_resource);
- } catch (TTransportException& e) {
- // reopen the client and set timeout to 500ms
- master_status = client.reopen(config::thrift_rpc_timeout_ms);
-
- if (!master_status.ok()) {
- LOG(WARNING) << "Reopen to get frontend client failed, with
address:"
- << _master_info.network_address.hostname << ":"
- << _master_info.network_address.port << ",
reason=" << e.what();
- return;
- }
- LOG(WARNING) << "fetchResource from frontend failed"
- << ", reason=" << e.what();
- client->fetchResource(new_fetched_resource);
- }
- } catch (TException& e) {
- // Already try twice, log here
- static_cast<void>(client.reopen(config::thrift_rpc_timeout_ms));
- LOG(WARNING) << "retry to fetchResource from " <<
_master_info.network_address.hostname
- << ":" << _master_info.network_address.port << "
failed:\n"
- << e.what();
- return;
- }
-}
-} // namespace doris
diff --git a/be/src/agent/workload_group_listener.cpp
b/be/src/agent/workload_group_listener.cpp
new file mode 100644
index 00000000000..bf27861c284
--- /dev/null
+++ b/be/src/agent/workload_group_listener.cpp
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "agent/workload_group_listener.h"
+
+#include "runtime/task_group/task_group.h"
+#include "runtime/task_group/task_group_manager.h"
+#include "util/mem_info.h"
+#include "util/parse_util.h"
+
+namespace doris {
+
+void WorkloadGroupListener::handle_topic_info(const TPublishTopicRequest&
topic_request) {
+ std::set<uint64_t> current_wg_ids;
+ for (const TopicInfo& topic_info : topic_request.topic_list) {
+ if (topic_info.topic_type !=
doris::TTopicInfoType::type::WORKLOAD_GROUP) {
+ continue;
+ }
+
+ int wg_id = 0;
+ auto iter2 = topic_info.info_map.find("id");
+ std::from_chars(iter2->second.c_str(), iter2->second.c_str() +
iter2->second.size(), wg_id);
+
+ current_wg_ids.insert(wg_id);
+ }
+
+ _exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids);
+}
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/agent/user_resource_listener.h
b/be/src/agent/workload_group_listener.h
similarity index 54%
rename from be/src/agent/user_resource_listener.h
rename to be/src/agent/workload_group_listener.h
index 6fd25bef671..d31b1c4ef65 100644
--- a/be/src/agent/user_resource_listener.h
+++ b/be/src/agent/workload_group_listener.h
@@ -17,31 +17,21 @@
#pragma once
-#include <gen_cpp/AgentService_types.h>
-#include <stdint.h>
+#include <glog/logging.h>
#include "agent/topic_listener.h"
+#include "runtime/exec_env.h"
namespace doris {
-class ExecEnv;
-class TMasterInfo;
-
-class UserResourceListener : public TopicListener {
+class WorkloadGroupListener : public TopicListener {
public:
- ~UserResourceListener();
- // Input parameters:
- // root_cgroups_path: root cgroups allocated by admin to doris
- UserResourceListener(ExecEnv* exec_env, const TMasterInfo& master_info);
- // This method should be async
- virtual void handle_update(const TAgentServiceVersion::type&
protocol_version,
- const TTopicUpdate& topic_update);
+ ~WorkloadGroupListener() {}
+ WorkloadGroupListener(ExecEnv* exec_env) : _exec_env(exec_env) {}
+
+ void handle_topic_info(const TPublishTopicRequest& topic_request) override;
private:
- const TMasterInfo& _master_info;
ExecEnv* _exec_env;
- // Call cgroups mgr to update user's cgroups resource share
- // Also refresh local user resource's cache
- void update_users_resource(int64_t new_version);
};
-} // namespace doris
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 4ef5def6f1d..cdd934d5c7d 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -189,6 +189,7 @@ void
BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks,
TaskScheduler::~TaskScheduler() {
stop();
+ LOG(INFO) << "Task scheduler " << _name << " shutdown";
}
Status TaskScheduler::start() {
@@ -363,7 +364,6 @@ void TaskScheduler::_try_close_task(PipelineTask* task,
PipelineTaskState state,
void TaskScheduler::stop() {
if (!this->_shutdown.load()) {
this->_shutdown.store(true);
- _blocked_task_scheduler->shutdown();
if (_task_queue) {
_task_queue->close();
}
diff --git a/be/src/runtime/task_group/task_group_manager.cpp
b/be/src/runtime/task_group/task_group_manager.cpp
index e6ed60148e0..b3c24fa96e7 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -122,6 +122,56 @@ Status
TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri
return Status::OK();
}
+void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> id_set) {
+ {
+ std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
+ for (auto iter = _task_groups.begin(); iter != _task_groups.end();) {
+ uint64_t tg_id = iter->first;
+ if (id_set.find(tg_id) == id_set.end()) {
+ iter = _task_groups.erase(iter);
+ } else {
+ iter++;
+ }
+ }
+ }
+
+ // stop task sche may cost some time, so it should not be locked
+ // task scheduler is stoped in task scheduler's destructor
+ std::set<std::unique_ptr<doris::pipeline::TaskScheduler>> task_sche_to_del;
+ std::set<std::unique_ptr<vectorized::SimplifiedScanScheduler>>
scan_task_sche_to_del;
+ {
+ std::lock_guard<std::mutex> lock(_task_scheduler_lock);
+ for (auto iter = _tg_sche_map.begin(); iter != _tg_sche_map.end();) {
+ uint64_t tg_id = iter->first;
+ if (id_set.find(tg_id) == id_set.end()) {
+ task_sche_to_del.insert(std::move(_tg_sche_map[tg_id]));
+ iter = _tg_sche_map.erase(iter);
+ } else {
+ iter++;
+ }
+ }
+
+ for (auto iter = _tg_scan_sche_map.begin(); iter !=
_tg_scan_sche_map.end();) {
+ uint64_t tg_id = iter->first;
+ if (id_set.find(tg_id) == id_set.end()) {
+
scan_task_sche_to_del.insert(std::move(_tg_scan_sche_map[tg_id]));
+ iter = _tg_scan_sche_map.erase(iter);
+ } else {
+ iter++;
+ }
+ }
+
+ for (auto iter = _cgroup_ctl_map.begin(); iter !=
_cgroup_ctl_map.end();) {
+ uint64_t tg_id = iter->first;
+ if (id_set.find(tg_id) == id_set.end()) {
+ iter = _cgroup_ctl_map.erase(iter);
+ } else {
+ iter++;
+ }
+ }
+ }
+}
+
void TaskGroupManager::stop() {
for (auto& task_sche : _tg_sche_map) {
task_sche.second->stop();
diff --git a/be/src/runtime/task_group/task_group_manager.h
b/be/src/runtime/task_group/task_group_manager.h
index e45cdeca7ea..ae501e93f3e 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -54,6 +54,8 @@ public:
Status create_and_get_task_scheduler(uint64_t wg_id, std::string wg_name,
int cpu_hard_limit,
ExecEnv* exec_env, QueryContext*
query_ctx_ptr);
+ void delete_task_group_by_ids(std::set<uint64_t> id_set);
+
void stop();
private:
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index e98dd65a8c7..8ad55e43e6e 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -90,6 +90,11 @@ public:
_agent_server->publish_cluster_state(result, request);
}
+ void publish_topic_info(TPublishTopicResult& result,
+ const TPublishTopicRequest& topic_request)
override {
+
_agent_server->get_topic_subscriber()->handle_topic_info(topic_request);
+ }
+
// DorisServer service
void exec_plan_fragment(TExecPlanFragmentResult& return_val,
const TExecPlanFragmentParams& params) override;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index ad6f86c4f18..32048458ed9 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -144,8 +144,14 @@ public:
_wg_name = wg_name;
}
+ ~SimplifiedScanScheduler() {
+ stop();
+ LOG(INFO) << "Scanner sche " << _wg_name << " shutdown";
+ }
+
void stop() {
_is_stop.store(true);
+ _scan_task_queue->shutdown();
_scan_thread_pool->shutdown();
_scan_thread_pool->wait();
}
@@ -169,8 +175,9 @@ private:
void _work() {
while (!_is_stop.load()) {
SimplifiedScanTask scan_task;
- _scan_task_queue->blocking_get(&scan_task);
- scan_task.scan_func();
+ if (_scan_task_queue->blocking_get(&scan_task)) {
+ scan_task.scan_func();
+ };
}
}
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index c50acaa0873..c9b75cb84f5 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2268,4 +2268,6 @@ public class Config extends ConfigBase {
})
public static int sync_image_timeout_second = 300;
+ @ConfField(mutable = true, masterOnly = true)
+ public static int publish_topic_info_interval_ms = 30000; // 30s
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 8e46daaf936..9a6d13552d6 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -113,6 +113,9 @@ import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.io.Text;
+import org.apache.doris.common.publish.TopicPublisher;
+import org.apache.doris.common.publish.TopicPublisherThread;
+import org.apache.doris.common.publish.WorkloadGroupPublisher;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.HttpURLUtil;
@@ -492,6 +495,8 @@ public class Env {
private HiveTransactionMgr hiveTransactionMgr;
+ private TopicPublisherThread topicPublisherThread;
+
public List<TFrontendInfo> getFrontendInfos() {
List<TFrontendInfo> res = new ArrayList<>();
@@ -726,6 +731,8 @@ public class Env {
this.binlogGcer = new BinlogGcer();
this.columnIdFlusher = new ColumnIdFlushDaemon();
this.queryCancelWorker = new QueryCancelWorker(systemInfo);
+ this.topicPublisherThread = new TopicPublisherThread(
+ "TopicPublisher", Config.publish_topic_info_interval_ms,
systemInfo);
}
public static void destroyCheckpoint() {
@@ -970,6 +977,10 @@ public class Env {
}
queryCancelWorker.start();
+
+ TopicPublisher wgPublisher = new WorkloadGroupPublisher(this);
+ topicPublisherThread.addToTopicPublisherList(wgPublisher);
+ topicPublisherThread.start();
}
// wait until FE is ready.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java
index b95469d6910..f9d15a1ae5a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java
@@ -30,15 +30,24 @@ public class AckResponseHandler extends ResponseHandler {
this.listener = listener;
}
+ public AckResponseHandler(Collection<Backend> nodes) {
+ super(nodes);
+ this.listener = null;
+ }
+
@Override
public void onResponse(Backend node) {
super.onResponse(node);
- listener.onResponse(node);
+ if (listener != null) {
+ listener.onResponse(node);
+ }
}
@Override
public void onFailure(Backend node, Throwable t) {
super.onFailure(node, t);
- listener.onFailure(node, t);
+ if (listener != null) {
+ listener.onFailure(node, t);
+ }
}
}
diff --git a/be/src/agent/topic_listener.h
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisher.java
similarity index 60%
copy from be/src/agent/topic_listener.h
copy to
fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisher.java
index 0ef9c597f19..24086cb0e7f 100644
--- a/be/src/agent/topic_listener.h
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisher.java
@@ -15,21 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-#pragma once
+package org.apache.doris.common.publish;
-#include <gen_cpp/AgentService_types.h>
+import org.apache.doris.thrift.TPublishTopicRequest;
-namespace doris {
-
-class TopicListener {
-public:
- virtual ~TopicListener() {}
- // Deal with a single update
- //
- // Input parameters:
- // protocol version: the version for the protocol, listeners should deal
with the msg according to the protocol
- // topic_update: single update
- virtual void handle_update(const TAgentServiceVersion::type&
protocol_version,
- const TTopicUpdate& topic_update) = 0;
-};
-} // namespace doris
+public interface TopicPublisher {
+ public void getTopicInfo(TPublishTopicRequest req);
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
new file mode 100644
index 00000000000..616c8a30b56
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.publish;
+
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPublishTopicRequest;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+public class TopicPublisherThread extends MasterDaemon {
+
+ private static final Logger LOG =
LogManager.getLogger(TopicPublisherThread.class);
+
+ private SystemInfoService clusterInfoService;
+
+ private ExecutorService executor = ThreadPoolManager
+ .newDaemonFixedThreadPool(6, 256, "topic-publish-thread", true);
+
+ public TopicPublisherThread(String name, long intervalMs,
+ SystemInfoService clusterInfoService) {
+ super(name, intervalMs);
+ this.clusterInfoService = clusterInfoService;
+ }
+
+ private List<TopicPublisher> topicPublisherList = new ArrayList<>();
+
+ public void addToTopicPublisherList(TopicPublisher topicPublisher) {
+ this.topicPublisherList.add(topicPublisher);
+ }
+
+ @Override
+ protected void runAfterCatalogReady() {
+ if (!Config.enable_workload_group) {
+ return;
+ }
+ LOG.info("begin publish topic info");
+ // step 1: get all publish topic info
+ TPublishTopicRequest request = new TPublishTopicRequest();
+ for (TopicPublisher topicPublisher : topicPublisherList) {
+ topicPublisher.getTopicInfo(request);
+ }
+
+ // step 2: publish topic info to all be
+ Collection<Backend> nodesToPublish =
clusterInfoService.getIdToBackend().values();
+ AckResponseHandler handler = new AckResponseHandler(nodesToPublish);
+ for (Backend be : nodesToPublish) {
+ executor.submit(new TopicPublishWorker(request, be, handler));
+ }
+ try {
+ int timeoutMs = Config.publish_topic_info_interval_ms / 3 * 2;
+ if (!handler.awaitAllInMs(timeoutMs)) {
+ Backend[] backends = handler.pendingNodes();
+ if (backends.length > 0) {
+ LOG.warn("timed out waiting for all nodes to publish.
(pending nodes: {})",
+ Arrays.toString(backends));
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public class TopicPublishWorker implements Runnable {
+ private TPublishTopicRequest request;
+ private Backend be;
+ private ResponseHandler handler;
+
+ public TopicPublishWorker(TPublishTopicRequest request, Backend node,
ResponseHandler handler) {
+ this.request = request;
+ this.be = node;
+ this.handler = handler;
+ }
+
+ @Override
+ public void run() {
+ long beginTime = System.currentTimeMillis();
+ try {
+ TNetworkAddress addr = new TNetworkAddress(be.getHost(),
be.getBePort());
+ BackendService.Client client =
ClientPool.backendPool.borrowObject(addr);
+ client.publishTopicInfo(request);
+ LOG.info("publish topic info to be {} success, time cost={}
ms",
+ be.getHost(), (System.currentTimeMillis() -
beginTime));
+ } catch (Exception e) {
+ LOG.warn("publish topic info to be {} error happens: , time
cost={} ms",
+ be.getHost(), (System.currentTimeMillis() -
beginTime), e);
+ } finally {
+ handler.onResponse(be);
+ }
+ }
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
similarity index 59%
copy from
fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java
copy to
fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
index b95469d6910..2330700ce7b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
@@ -17,28 +17,23 @@
package org.apache.doris.common.publish;
-import org.apache.doris.system.Backend;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.thrift.TPublishTopicRequest;
+import org.apache.doris.thrift.TopicInfo;
-import java.util.Collection;
+public class WorkloadGroupPublisher implements TopicPublisher {
-// Response handler contain a listener
-public class AckResponseHandler extends ResponseHandler {
- private Listener listener;
+ private Env env;
- public AckResponseHandler(Collection<Backend> nodes, Listener listener) {
- super(nodes);
- this.listener = listener;
+ public WorkloadGroupPublisher(Env env) {
+ this.env = env;
}
@Override
- public void onResponse(Backend node) {
- super.onResponse(node);
- listener.onResponse(node);
- }
-
- @Override
- public void onFailure(Backend node, Throwable t) {
- super.onFailure(node, t);
- listener.onFailure(node, t);
+ public void getTopicInfo(TPublishTopicRequest req) {
+ for (TopicInfo topicInfo : env.getWorkloadGroupMgr()
+ .getPublishTopicInfo()) {
+ req.addToTopicList(topicInfo);
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index 68115c4c842..b8aebdccab0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -25,6 +25,8 @@ import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
+import org.apache.doris.thrift.TTopicInfoType;
+import org.apache.doris.thrift.TopicInfo;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
@@ -290,6 +292,16 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
return new
TPipelineWorkloadGroup().setId(id).setName(name).setProperties(clonedHashMap).setVersion(version);
}
+ public TopicInfo toTopicInfo() {
+ HashMap<String, String> newHashMap = new HashMap<>();
+ newHashMap.put("id", String.valueOf(id));
+ TopicInfo topicInfo = new TopicInfo();
+ topicInfo.setTopicType(TTopicInfoType.WORKLOAD_GROUP);
+ topicInfo.setInfoMap(newHashMap);
+ topicInfo.setTopicKey(name);
+ return topicInfo;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 1ec47d053fe..5f0a52cab55 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -40,6 +40,7 @@ import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TUserIdentity;
+import org.apache.doris.thrift.TopicInfo;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
@@ -52,6 +53,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -133,6 +135,19 @@ public class WorkloadGroupMgr implements Writable,
GsonPostProcessable {
return workloadGroups;
}
+ public List<TopicInfo> getPublishTopicInfo() {
+ List<TopicInfo> workloadGroups = new ArrayList();
+ readLock();
+ try {
+ for (WorkloadGroup wg : idToWorkloadGroup.values()) {
+ workloadGroups.add(wg.toTopicInfo());
+ }
+ } finally {
+ readUnlock();
+ }
+ return workloadGroups;
+ }
+
public QueryQueue getWorkloadGroupQueryQueue(ConnectContext context)
throws UserException {
String groupName = getWorkloadGroupNameAndCheckPriv(context);
readLock();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
index 3ab76732f61..a2f3867de2b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
@@ -32,6 +32,8 @@ import org.apache.doris.thrift.TExportTaskRequest;
import org.apache.doris.thrift.TIngestBinlogRequest;
import org.apache.doris.thrift.TIngestBinlogResult;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPublishTopicRequest;
+import org.apache.doris.thrift.TPublishTopicResult;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TScanBatchResult;
import org.apache.doris.thrift.TScanCloseParams;
@@ -139,6 +141,11 @@ public class GenericPoolTest {
return null;
}
+ @Override
+ public TPublishTopicResult publishTopicInfo(TPublishTopicRequest
request) throws TException {
+ return null;
+ }
+
@Override
public TAgentResult makeSnapshot(TSnapshotRequest snapshotRequest)
throws TException {
// TODO Auto-generated method stub
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index d3f5aeeacc2..058e356a325 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -50,6 +50,8 @@ import org.apache.doris.thrift.TIngestBinlogRequest;
import org.apache.doris.thrift.TIngestBinlogResult;
import org.apache.doris.thrift.TMasterInfo;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPublishTopicRequest;
+import org.apache.doris.thrift.TPublishTopicResult;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TScanBatchResult;
import org.apache.doris.thrift.TScanCloseParams;
@@ -299,6 +301,11 @@ public class MockedBackendFactory {
return new TAgentResult(new TStatus(TStatusCode.OK));
}
+ @Override
+ public TPublishTopicResult publishTopicInfo(TPublishTopicRequest
request) throws TException {
+ return new TPublishTopicResult(new TStatus(TStatusCode.OK));
+ }
+
@Override
public TStatus submitExportTask(TExportTaskRequest request) throws
TException {
return new TStatus(TStatusCode.OK);
diff --git a/gensrc/thrift/AgentService.thrift
b/gensrc/thrift/AgentService.thrift
index 4b4c260b473..c161fc99e71 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -506,5 +506,4 @@ struct TTopicUpdate {
struct TAgentPublishRequest {
1: required TAgentServiceVersion protocol_version
2: required list<TTopicUpdate> updates
-}
-
+}
\ No newline at end of file
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index 3d77eab4cad..a7a9c50aed2 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -138,6 +138,24 @@ struct TIngestBinlogResult {
1: optional Status.TStatus status;
}
+enum TTopicInfoType {
+ WORKLOAD_GROUP
+}
+
+struct TopicInfo {
+ 1: optional string topic_key
+ 2: required TTopicInfoType topic_type
+ 3: optional map<string, string> info_map
+}
+
+struct TPublishTopicRequest {
+ 1: required list<TopicInfo> topic_list
+}
+
+struct TPublishTopicResult {
+ 1: required Status.TStatus status
+}
+
service BackendService {
// Called by coord to start asynchronous execution of plan fragment in
backend.
// Returns as soon as all incoming data streams have been set up.
@@ -193,4 +211,6 @@ service BackendService {
TCheckStorageFormatResult check_storage_format();
TIngestBinlogResult ingest_binlog(1: TIngestBinlogRequest
ingest_binlog_request);
+
+ TPublishTopicResult publish_topic_info(1:TPublishTopicRequest
topic_request);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]