This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 98be2231847 [feature](merge-cloud) Implement cloud mixin service
(#30605)
98be2231847 is described below
commit 98be223184724f9e7491fb59c080a437632ec522
Author: walter <[email protected]>
AuthorDate: Wed Jan 31 19:14:03 2024 +0800
[feature](merge-cloud) Implement cloud mixin service (#30605)
Co-authored-by: platoneko <[email protected]>
---
be/src/agent/agent_server.cpp | 276 +++++++++++---------------------
be/src/agent/agent_server.h | 52 ++----
be/src/agent/task_worker_pool.cpp | 20 ++-
be/src/agent/task_worker_pool.h | 20 ++-
be/src/cloud/cloud_backend_service.cpp | 48 ++++++
be/src/cloud/cloud_backend_service.h | 41 +++++
be/src/cloud/cloud_internal_service.cpp | 27 ++++
be/src/cloud/cloud_internal_service.h | 38 +++++
be/src/cloud/cloud_tablet_mgr.cpp | 3 +-
be/src/olap/tablet_meta.cpp | 15 +-
be/src/runtime/exec_env_init.cpp | 2 +
be/src/runtime/load_stream_mgr.cpp | 2 +-
be/src/service/backend_service.cpp | 117 +++++++++++---
be/src/service/backend_service.h | 44 +++--
be/src/service/brpc_service.cpp | 14 +-
be/src/service/doris_main.cpp | 17 +-
be/test/agent/task_worker_pool_test.cpp | 22 +--
17 files changed, 467 insertions(+), 291 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 7b77105a4aa..d2a6e605b88 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -24,6 +24,7 @@
#include <thrift/protocol/TDebugProtocol.h>
#include <filesystem>
+#include <memory>
#include <ostream>
#include <string>
@@ -49,24 +50,8 @@ namespace doris {
AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info)
: _master_info(master_info), _topic_subscriber(new TopicSubscriber()) {
- for (const auto& path : exec_env->store_paths()) {
- try {
- string dpp_download_path_str = path.path + "/" + DPP_PREFIX;
- std::filesystem::path dpp_download_path(dpp_download_path_str);
- if (std::filesystem::exists(dpp_download_path)) {
- std::filesystem::remove_all(dpp_download_path);
- }
- } catch (...) {
- LOG(WARNING) << "boost exception when remove dpp download path.
path=" << path.path;
- }
- }
-
MasterServerClient::create(master_info);
-#ifndef BE_TEST
- start_workers(exec_env);
-#endif
-
#if !defined(BE_TEST) && !defined(__APPLE__)
// Add subscriber here and register listeners
std::unique_ptr<TopicListener> wg_listener =
std::make_unique<WorkloadGroupListener>(exec_env);
@@ -85,84 +70,135 @@ AgentServer::AgentServer(ExecEnv* exec_env, const
TMasterInfo& master_info)
AgentServer::~AgentServer() = default;
-void AgentServer::start_workers(ExecEnv* exec_env) {
- // TODO(plat1ko): CloudStorageEngine
- auto& engine = ExecEnv::GetInstance()->storage_engine().to_local();
+class PushTaskWorkerPool final : public TaskWorkerPoolIf {
+public:
+ PushTaskWorkerPool(StorageEngine& engine)
+ : _push_delete_workers(
+ TaskWorkerPool("DELETE", config::delete_worker_count,
+ [&engine](auto&& task) {
push_callback(engine, task); })),
+ _push_load_workers(PriorTaskWorkerPool(
+ "PUSH", config::push_worker_count_normal_priority,
+ config::push_worker_count_high_priority,
+ [&engine](auto&& task) { push_callback(engine, task);
})) {}
+
+ ~PushTaskWorkerPool() override { stop(); }
+
+ void stop() {
+ _push_delete_workers.stop();
+ _push_load_workers.stop();
+ }
+
+ Status submit_task(const TAgentTaskRequest& task) override {
+ if (task.push_req.push_type == TPushType::LOAD_V2) {
+ return _push_load_workers.submit_task(task);
+ } else if (task.push_req.push_type == TPushType::DELETE) {
+ return _push_delete_workers.submit_task(task);
+ } else {
+ return Status::InvalidArgument(
+ "task(signature={}, type={}, push_type={}) has wrong
push_type", task.signature,
+ task.task_type, task.push_req.push_type);
+ }
+ }
+
+private:
+ TaskWorkerPool _push_delete_workers;
+ PriorTaskWorkerPool _push_load_workers;
+};
+
+void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) {
+ for (const auto& path : exec_env->store_paths()) {
+ try {
+ string dpp_download_path_str = path.path + "/" + DPP_PREFIX;
+ std::filesystem::path dpp_download_path(dpp_download_path_str);
+ if (std::filesystem::exists(dpp_download_path)) {
+ std::filesystem::remove_all(dpp_download_path);
+ }
+ } catch (...) {
+ LOG(WARNING) << "boost exception when remove dpp download path.
path=" << path.path;
+ }
+ }
+
// clang-format off
- _alter_inverted_index_workers = std::make_unique<TaskWorkerPool>(
- "ALTER_INVERTED_INDEX", config::alter_index_worker_count,
[&engine](auto&& task) { return alter_inverted_index_callback(engine, task); });
+ _workers[TTaskType::ALTER_INVERTED_INDEX] =
std::make_unique<TaskWorkerPool>(
+ "ALTER_INVERTED_INDEX", config::alter_index_worker_count,
[&engine](auto&& task) { return alter_inverted_index_callback(engine, task); });
- _check_consistency_workers = std::make_unique<TaskWorkerPool>(
- "CHECK_CONSISTENCY", config::check_consistency_worker_count,
[&engine](auto&& task) { return check_consistency_callback(engine, task); });
+ _workers[TTaskType::CHECK_CONSISTENCY] = std::make_unique<TaskWorkerPool>(
+ "CHECK_CONSISTENCY", config::check_consistency_worker_count,
[&engine](auto&& task) { return check_consistency_callback(engine, task); });
- _upload_workers = std::make_unique<TaskWorkerPool>(
+ _workers[TTaskType::UPLOAD] = std::make_unique<TaskWorkerPool>(
"UPLOAD", config::upload_worker_count, [&engine, exec_env](auto&&
task) { return upload_callback(engine, exec_env, task); });
- _download_workers = std::make_unique<TaskWorkerPool>(
+ _workers[TTaskType::DOWNLOAD] = std::make_unique<TaskWorkerPool>(
"DOWNLOAD", config::download_worker_count, [&engine,
exec_env](auto&& task) { return download_callback(engine, exec_env, task); });
- _make_snapshot_workers = std::make_unique<TaskWorkerPool>(
+ _workers[TTaskType::MAKE_SNAPSHOT] = std::make_unique<TaskWorkerPool>(
"MAKE_SNAPSHOT", config::make_snapshot_worker_count,
[&engine](auto&& task) { return make_snapshot_callback(engine, task); });
- _release_snapshot_workers = std::make_unique<TaskWorkerPool>(
+ _workers[TTaskType::RELEASE_SNAPSHOT] = std::make_unique<TaskWorkerPool>(
"RELEASE_SNAPSHOT", config::release_snapshot_worker_count,
[&engine](auto&& task) { return release_snapshot_callback(engine, task); });
- _move_dir_workers = std::make_unique<TaskWorkerPool>(
+ _workers[TTaskType::MOVE] = std::make_unique<TaskWorkerPool>(
"MOVE", 1, [&engine, exec_env](auto&& task) { return
move_dir_callback(engine, exec_env, task); });
- _submit_table_compaction_workers = std::make_unique<TaskWorkerPool>(
+ _workers[TTaskType::COMPACTION] = std::make_unique<TaskWorkerPool>(
"SUBMIT_TABLE_COMPACTION", 1, [&engine](auto&& task) { return
submit_table_compaction_callback(engine, task); });
- _push_storage_policy_workers = std::make_unique<TaskWorkerPool>(
+ _workers[TTaskType::PUSH_STORAGE_POLICY] =
std::make_unique<TaskWorkerPool>(
"PUSH_STORAGE_POLICY", 1, [&engine](auto&& task) { return
push_storage_policy_callback(engine, task); });
- _push_cooldown_conf_workers = std::make_unique<TaskWorkerPool>(
+ _workers[TTaskType::PUSH_COOLDOWN_CONF] = std::make_unique<TaskWorkerPool>(
"PUSH_COOLDOWN_CONF", 1, [&engine](auto&& task) { return
push_cooldown_conf_callback(engine, task); });
- _create_tablet_workers = std::make_unique<TaskWorkerPool>(
+ _workers[TTaskType::CREATE] = std::make_unique<TaskWorkerPool>(
"CREATE_TABLE", config::create_tablet_worker_count,
[&engine](auto&& task) { return create_tablet_callback(engine, task); });
- _drop_tablet_workers = std::make_unique<TaskWorkerPool>(
+ _workers[TTaskType::DROP] = std::make_unique<TaskWorkerPool>(
"DROP_TABLE", config::drop_tablet_worker_count, [&engine](auto&&
task) { return drop_tablet_callback(engine, task); });
- _publish_version_workers =
std::make_unique<PublishVersionWorkerPool>(engine);
+ _workers[TTaskType::PUBLISH_VERSION] =
std::make_unique<PublishVersionWorkerPool>(engine);
- _clear_transaction_task_workers = std::make_unique<TaskWorkerPool>(
+ _workers[TTaskType::CLEAR_TRANSACTION_TASK] =
std::make_unique<TaskWorkerPool>(
"CLEAR_TRANSACTION_TASK",
config::clear_transaction_task_worker_count, [&engine](auto&& task) { return
clear_transaction_task_callback(engine, task); });
- _push_delete_workers = std::make_unique<TaskWorkerPool>(
- "DELETE", config::delete_worker_count, [&engine](auto&& task) {
push_callback(engine, task); });
-
- // Both PUSH and REALTIME_PUSH type use push_callback
- _push_load_workers = std::make_unique<PriorTaskWorkerPool>(
- "PUSH", config::push_worker_count_normal_priority,
config::push_worker_count_high_priority, [&engine](auto&& task) {
push_callback(engine, task); });
+ _workers[TTaskType::PUSH] = std::make_unique<PushTaskWorkerPool>(engine);
- _update_tablet_meta_info_workers = std::make_unique<TaskWorkerPool>(
+ _workers[TTaskType::UPDATE_TABLET_META_INFO] =
std::make_unique<TaskWorkerPool>(
"UPDATE_TABLET_META_INFO", 1, [&engine](auto&& task) { return
update_tablet_meta_callback(engine, task); });
- _alter_tablet_workers = std::make_unique<TaskWorkerPool>(
+ _workers[TTaskType::ALTER] = std::make_unique<TaskWorkerPool>(
"ALTER_TABLE", config::alter_tablet_worker_count, [&engine](auto&&
task) { return alter_tablet_callback(engine, task); });
- _clone_workers = std::make_unique<TaskWorkerPool>(
+ _workers[TTaskType::CLONE] = std::make_unique<TaskWorkerPool>(
"CLONE", config::clone_worker_count, [&engine, &master_info =
_master_info](auto&& task) { return clone_callback(engine, master_info, task);
});
- _storage_medium_migrate_workers = std::make_unique<TaskWorkerPool>(
+ _workers[TTaskType::STORAGE_MEDIUM_MIGRATE] =
std::make_unique<TaskWorkerPool>(
"STORAGE_MEDIUM_MIGRATE", config::storage_medium_migrate_count,
[&engine](auto&& task) { return storage_medium_migrate_callback(engine, task);
});
- _gc_binlog_workers = std::make_unique<TaskWorkerPool>(
+ _workers[TTaskType::GC_BINLOG] = std::make_unique<TaskWorkerPool>(
"GC_BINLOG", 1, [&engine](auto&& task) { return
gc_binlog_callback(engine, task); });
- _report_task_workers = std::make_unique<ReportWorker>(
- "REPORT_TASK", _master_info, config::report_task_interval_seconds,
[&master_info = _master_info] { report_task_callback(master_info); });
+ _report_workers.push_back(std::make_unique<ReportWorker>(
+ "REPORT_TASK", _master_info, config::report_task_interval_seconds,
[&master_info = _master_info] { report_task_callback(master_info); }));
- _report_disk_state_workers = std::make_unique<ReportWorker>(
- "REPORT_DISK_STATE", _master_info,
config::report_disk_state_interval_seconds, [&engine, &master_info =
_master_info] { report_disk_callback(engine, master_info); });
+ _report_workers.push_back(std::make_unique<ReportWorker>(
+ "REPORT_DISK_STATE", _master_info,
config::report_disk_state_interval_seconds, [&engine, &master_info =
_master_info] { report_disk_callback(engine, master_info); }));
- _report_tablet_workers = std::make_unique<ReportWorker>(
- "REPORT_OLAP_TABLE", _master_info,
config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] {
report_tablet_callback(engine, master_info); });
+ _report_workers.push_back(std::make_unique<ReportWorker>(
+ "REPORT_OLAP_TABLE", _master_info,
config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] {
report_tablet_callback(engine, master_info); }));
// clang-format on
}
+void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv*
exec_env) {
+ // TODO(plat1ko): DELETE worker
+ // TODO(plat1ko): ALTER worker
+ // TODO(plat1ko): SUBMIT_TABLE_COMPACTION worker
+ // TODO(plat1ko): CALCULATE_DELETE_BITMAP worker
+
+ _report_workers.push_back(std::make_unique<ReportWorker>(
+ "REPORT_TASK", _master_info, config::report_task_interval_seconds,
+ [&master_info = _master_info] { report_task_callback(master_info);
}));
+}
+
// TODO(lingbin): each task in the batch may have it own status or FE must
check and
// resend request when something is wrong(BE may need some logic to guarantee
idempotence.
void AgentServer::submit_tasks(TAgentResult& agent_result,
@@ -178,101 +214,18 @@ void AgentServer::submit_tasks(TAgentResult&
agent_result,
for (auto&& task : tasks) {
VLOG_RPC << "submit one task: " <<
apache::thrift::ThriftDebugString(task).c_str();
- TTaskType::type task_type = task.task_type;
+ auto task_type = task.task_type;
+ if (task_type == TTaskType::REALTIME_PUSH) {
+ task_type = TTaskType::PUSH;
+ }
int64_t signature = task.signature;
-
-#define HANDLE_TYPE(t_task_type, work_pool, req_member)
\
- case t_task_type:
\
- if (task.__isset.req_member) {
\
- work_pool->submit_task(task);
\
- } else {
\
- ret_st = Status::InvalidArgument("task(signature={}) has wrong
request member = {}", \
- signature, #req_member);
\
- }
\
- break;
-
- // TODO(lingbin): It still too long, divided these task types into
several categories
- switch (task_type) {
- HANDLE_TYPE(TTaskType::CREATE, _create_tablet_workers,
create_tablet_req);
- HANDLE_TYPE(TTaskType::DROP, _drop_tablet_workers,
drop_tablet_req);
- HANDLE_TYPE(TTaskType::PUBLISH_VERSION, _publish_version_workers,
publish_version_req);
- HANDLE_TYPE(TTaskType::CLEAR_TRANSACTION_TASK,
_clear_transaction_task_workers,
- clear_transaction_task_req);
- HANDLE_TYPE(TTaskType::CLONE, _clone_workers, clone_req);
- HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE,
_storage_medium_migrate_workers,
- storage_medium_migrate_req);
- HANDLE_TYPE(TTaskType::CHECK_CONSISTENCY,
_check_consistency_workers,
- check_consistency_req);
- HANDLE_TYPE(TTaskType::UPLOAD, _upload_workers, upload_req);
- HANDLE_TYPE(TTaskType::DOWNLOAD, _download_workers, download_req);
- HANDLE_TYPE(TTaskType::MAKE_SNAPSHOT, _make_snapshot_workers,
snapshot_req);
- HANDLE_TYPE(TTaskType::RELEASE_SNAPSHOT, _release_snapshot_workers,
- release_snapshot_req);
- HANDLE_TYPE(TTaskType::MOVE, _move_dir_workers, move_dir_req);
- HANDLE_TYPE(TTaskType::UPDATE_TABLET_META_INFO,
_update_tablet_meta_info_workers,
- update_tablet_meta_info_req);
- HANDLE_TYPE(TTaskType::COMPACTION,
_submit_table_compaction_workers, compaction_req);
- HANDLE_TYPE(TTaskType::PUSH_STORAGE_POLICY,
_push_storage_policy_workers,
- push_storage_policy_req);
-
- case TTaskType::REALTIME_PUSH:
- case TTaskType::PUSH:
- if (!task.__isset.push_req) {
- ret_st = Status::InvalidArgument(
- "task(signature={}) has wrong request member =
push_req", signature);
- break;
- }
- if (task.push_req.push_type == TPushType::LOAD_V2) {
- _push_load_workers->submit_task(task);
- } else if (task.push_req.push_type == TPushType::DELETE) {
- _push_delete_workers->submit_task(task);
- } else {
- ret_st = Status::InvalidArgument(
- "task(signature={}, type={}, push_type={}) has wrong
push_type", signature,
- task_type, task.push_req.push_type);
- }
- break;
- case TTaskType::ALTER:
- if (task.__isset.alter_tablet_req ||
task.__isset.alter_tablet_req_v2) {
- _alter_tablet_workers->submit_task(task);
- } else {
- ret_st = Status::InvalidArgument(
- "task(signature={}) has wrong request member =
alter_tablet_req",
- signature);
- }
- break;
- case TTaskType::ALTER_INVERTED_INDEX:
- if (task.__isset.alter_inverted_index_req) {
- _alter_inverted_index_workers->submit_task(task);
- } else {
- ret_st = Status::InvalidArgument(strings::Substitute(
- "task(signature=$0) has wrong request member =
alter_inverted_index_req",
- signature));
- }
- break;
- case TTaskType::PUSH_COOLDOWN_CONF:
- if (task.__isset.push_cooldown_conf) {
- _push_cooldown_conf_workers->submit_task(task);
- } else {
- ret_st = Status::InvalidArgument(
- "task(signature={}) has wrong request member =
push_cooldown_conf",
- signature);
- }
- break;
- case TTaskType::GC_BINLOG:
- if (task.__isset.gc_binlog_req) {
- _gc_binlog_workers->submit_task(task);
- } else {
- ret_st = Status::InvalidArgument(
- "task(signature={}) has wrong request member =
gc_binlog_req", signature);
- }
- break;
- default:
+ if (auto it = _workers.find(task_type); it != _workers.end()) {
+ auto& worker = it->second;
+ ret_st = worker->submit_task(task);
+ } else {
ret_st = Status::InvalidArgument("task(signature={}, type={}) has
wrong task type",
- signature, task_type);
- break;
+ signature, task.task_type);
}
-#undef HANDLE_TYPE
if (!ret_st.ok()) {
LOG_WARNING("failed to submit task").tag("task",
task).error(ret_st);
@@ -292,41 +245,6 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
ret_st.to_thrift(&agent_result.status);
}
-void AgentServer::make_snapshot(StorageEngine& engine, TAgentResult&
t_agent_result,
- const TSnapshotRequest& snapshot_request) {
- string snapshot_path;
- bool allow_incremental_clone = false;
- Status status = engine.snapshot_mgr()->make_snapshot(snapshot_request,
&snapshot_path,
-
&allow_incremental_clone);
- if (!status) {
- LOG_WARNING("failed to make snapshot")
- .tag("tablet_id", snapshot_request.tablet_id)
- .tag("schema_hash", snapshot_request.schema_hash)
- .error(status);
- } else {
- LOG_INFO("successfully make snapshot")
- .tag("tablet_id", snapshot_request.tablet_id)
- .tag("schema_hash", snapshot_request.schema_hash)
- .tag("snapshot_path", snapshot_path);
- t_agent_result.__set_snapshot_path(snapshot_path);
- t_agent_result.__set_allow_incremental_clone(allow_incremental_clone);
- }
-
- status.to_thrift(&t_agent_result.status);
-
t_agent_result.__set_snapshot_version(snapshot_request.preferred_snapshot_version);
-}
-
-void AgentServer::release_snapshot(StorageEngine& engine, TAgentResult&
t_agent_result,
- const std::string& snapshot_path) {
- Status status = engine.snapshot_mgr()->release_snapshot(snapshot_path);
- if (!status) {
- LOG_WARNING("failed to release snapshot").tag("snapshot_path",
snapshot_path).error(status);
- } else {
- LOG_INFO("successfully release snapshot").tag("snapshot_path",
snapshot_path);
- }
- status.to_thrift(&t_agent_result.status);
-}
-
void AgentServer::publish_cluster_state(TAgentResult& t_agent_result,
const TAgentPublishRequest& request) {
Status status = Status::NotSupported("deprecated
method(publish_cluster_state) was invoked");
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index 6748b10d8b2..761bb9e4b67 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -21,10 +21,12 @@
#include <memory>
#include <string>
+#include <unordered_map>
#include <vector>
namespace doris {
+class TaskWorkerPoolIf;
class TaskWorkerPool;
class PriorTaskWorkerPool;
class ReportWorker;
@@ -36,6 +38,7 @@ class TAgentTaskRequest;
class TMasterInfo;
class TSnapshotRequest;
class StorageEngine;
+class CloudStorageEngine;
// Each method corresponds to one RPC from FE Master, see BackendService.
class AgentServer {
@@ -43,15 +46,13 @@ public:
explicit AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info);
~AgentServer();
+ void start_workers(StorageEngine& engine, ExecEnv* exec_env);
+
+ void cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_env);
+
// Receive agent task from FE master
void submit_tasks(TAgentResult& agent_result, const
std::vector<TAgentTaskRequest>& tasks);
- // TODO(lingbin): make the agent_result to be a pointer, because it will
be modified.
- static void make_snapshot(StorageEngine& engine, TAgentResult&
agent_result,
- const TSnapshotRequest& snapshot_request);
- static void release_snapshot(StorageEngine& engine, TAgentResult&
agent_result,
- const std::string& snapshot_path);
-
// Deprecated
// TODO(lingbin): This method is deprecated, should be removed later.
// [[deprecated]]
@@ -60,43 +61,16 @@ public:
TopicSubscriber* get_topic_subscriber() { return _topic_subscriber.get(); }
private:
- void start_workers(ExecEnv* exec_env);
-
// Reference to the ExecEnv::_master_info
const TMasterInfo& _master_info;
- std::unique_ptr<TaskWorkerPool> _create_tablet_workers;
- std::unique_ptr<TaskWorkerPool> _drop_tablet_workers;
- std::unique_ptr<PriorTaskWorkerPool> _push_load_workers;
- std::unique_ptr<TaskWorkerPool> _publish_version_workers;
- std::unique_ptr<TaskWorkerPool> _clear_transaction_task_workers;
- std::unique_ptr<TaskWorkerPool> _push_delete_workers;
- std::unique_ptr<TaskWorkerPool> _alter_tablet_workers;
- std::unique_ptr<TaskWorkerPool> _alter_inverted_index_workers;
- std::unique_ptr<TaskWorkerPool> _push_cooldown_conf_workers;
- std::unique_ptr<TaskWorkerPool> _clone_workers;
- std::unique_ptr<TaskWorkerPool> _storage_medium_migrate_workers;
- std::unique_ptr<TaskWorkerPool> _check_consistency_workers;
-
- // These 3 worker-pool do not accept tasks from FE.
- // It is self triggered periodically and reports to Fe master
- std::unique_ptr<ReportWorker> _report_task_workers;
- std::unique_ptr<ReportWorker> _report_disk_state_workers;
- std::unique_ptr<ReportWorker> _report_tablet_workers;
-
- std::unique_ptr<TaskWorkerPool> _upload_workers;
- std::unique_ptr<TaskWorkerPool> _download_workers;
- std::unique_ptr<TaskWorkerPool> _make_snapshot_workers;
- std::unique_ptr<TaskWorkerPool> _release_snapshot_workers;
- std::unique_ptr<TaskWorkerPool> _move_dir_workers;
- std::unique_ptr<TaskWorkerPool> _recover_tablet_workers;
- std::unique_ptr<TaskWorkerPool> _update_tablet_meta_info_workers;
-
- std::unique_ptr<TaskWorkerPool> _submit_table_compaction_workers;
-
- std::unique_ptr<TaskWorkerPool> _push_storage_policy_workers;
+ std::unordered_map<int64_t /* TTaskType */,
std::unique_ptr<TaskWorkerPoolIf>> _workers;
+
+ // These workers do not accept tasks from FE.
+ // It is self triggered periodically and reports to FE master
+ std::vector<std::unique_ptr<ReportWorker>> _report_workers;
+
std::unique_ptr<TopicSubscriber> _topic_subscriber;
- std::unique_ptr<TaskWorkerPool> _gc_binlog_workers;
};
} // end namespace doris
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index aa54a3d0e70..0c5ee012542 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -303,8 +303,8 @@ bool handle_report(const TReportRequest& request, const
TMasterInfo& master_info
return true;
}
-void _submit_task(const TAgentTaskRequest& task,
- std::function<Status(const TAgentTaskRequest&)> submit_op) {
+Status _submit_task(const TAgentTaskRequest& task,
+ std::function<Status(const TAgentTaskRequest&)> submit_op)
{
const TTaskType::type task_type = task.task_type;
int64_t signature = task.signature;
@@ -314,18 +314,22 @@ void _submit_task(const TAgentTaskRequest& task,
if (!register_task_info(task_type, signature)) {
LOG_WARNING("failed to register task").tag("type",
type_str).tag("signature", signature);
- return;
+ // Duplicated task request, just return OK
+ return Status::OK();
}
+ // TODO(plat1ko): check task request member
+
// Set the receiving time of task so that we can determine whether it is
timed out later
(const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr));
auto st = submit_op(task);
if (!st.ok()) [[unlikely]] {
LOG_INFO("failed to submit task").tag("type",
type_str).tag("signature", signature);
- return;
+ return st;
}
LOG_INFO("successfully submit task").tag("type",
type_str).tag("signature", signature);
+ return Status::OK();
}
bvar::LatencyRecorder g_publish_version_latency("doris_pk", "publish_version");
@@ -426,8 +430,8 @@ void TaskWorkerPool::stop() {
}
}
-void TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
- _submit_task(task, [this](auto&& task) {
+Status TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
+ return _submit_task(task, [this](auto&& task) {
add_task_count(task, 1);
return _thread_pool->submit_func([this, task]() {
_callback(task);
@@ -484,8 +488,8 @@ void PriorTaskWorkerPool::stop() {
}
}
-void PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
- _submit_task(task, [this](auto&& task) {
+Status PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
+ return _submit_task(task, [this](auto&& task) {
auto req = std::make_unique<TAgentTaskRequest>(task);
add_task_count(*req, 1);
if (req->__isset.priority && req->priority == TPriority::HIGH) {
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 241bec98578..5d440377724 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -26,6 +26,7 @@
#include <string>
#include <string_view>
+#include "common/status.h"
#include "gutil/ref_counted.h"
namespace doris {
@@ -39,16 +40,23 @@ class TReportRequest;
class TTabletInfo;
class TAgentTaskRequest;
-class TaskWorkerPool {
+class TaskWorkerPoolIf {
+public:
+ virtual ~TaskWorkerPoolIf() = default;
+
+ virtual Status submit_task(const TAgentTaskRequest& task) = 0;
+};
+
+class TaskWorkerPool : public TaskWorkerPoolIf {
public:
TaskWorkerPool(std::string_view name, int worker_count,
std::function<void(const TAgentTaskRequest&)> callback);
- virtual ~TaskWorkerPool();
+ ~TaskWorkerPool() override;
void stop();
- void submit_task(const TAgentTaskRequest& task);
+ Status submit_task(const TAgentTaskRequest& task) override;
protected:
std::atomic_bool _stopped {false};
@@ -68,16 +76,16 @@ private:
StorageEngine& _engine;
};
-class PriorTaskWorkerPool {
+class PriorTaskWorkerPool final : public TaskWorkerPoolIf {
public:
PriorTaskWorkerPool(std::string_view name, int normal_worker_count, int
high_prior_worker_conut,
std::function<void(const TAgentTaskRequest& task)>
callback);
- ~PriorTaskWorkerPool();
+ ~PriorTaskWorkerPool() override;
void stop();
- void submit_task(const TAgentTaskRequest& task);
+ Status submit_task(const TAgentTaskRequest& task) override;
private:
void normal_loop();
diff --git a/be/src/cloud/cloud_backend_service.cpp
b/be/src/cloud/cloud_backend_service.cpp
new file mode 100644
index 00000000000..23a75ffda76
--- /dev/null
+++ b/be/src/cloud/cloud_backend_service.cpp
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_backend_service.h"
+
+#include "common/config.h"
+#include "util/thrift_server.h"
+
+namespace doris {
+
+CloudBackendService::CloudBackendService(CloudStorageEngine& engine, ExecEnv*
exec_env)
+ : BaseBackendService(exec_env), _engine(engine) {}
+
+CloudBackendService::~CloudBackendService() = default;
+
+Status CloudBackendService::create_service(CloudStorageEngine& engine,
ExecEnv* exec_env, int port,
+ std::unique_ptr<ThriftServer>*
server) {
+ auto service = std::make_shared<CloudBackendService>(engine, exec_env);
+ service->_agent_server->cloud_start_workers(engine, exec_env);
+ // TODO: do we want a BoostThreadFactory?
+ // TODO: we want separate thread factories here, so that fe requests can't
starve
+ // be requests
+ // std::shared_ptr<TProcessor> be_processor =
std::make_shared<BackendServiceProcessor>(service);
+ auto be_processor = std::make_shared<BackendServiceProcessor>(service);
+
+ *server = std::make_unique<ThriftServer>("backend", be_processor, port,
+ config::be_service_threads);
+
+ LOG(INFO) << "Doris CloudBackendService listening on " << port;
+
+ return Status::OK();
+}
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_backend_service.h
b/be/src/cloud/cloud_backend_service.h
new file mode 100644
index 00000000000..6c9a710c3a9
--- /dev/null
+++ b/be/src/cloud/cloud_backend_service.h
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "service/backend_service.h"
+
+namespace doris {
+
+class CloudStorageEngine;
+
+class CloudBackendService final : public BaseBackendService {
+public:
+ static Status create_service(CloudStorageEngine& engine, ExecEnv*
exec_env, int port,
+ std::unique_ptr<ThriftServer>* server);
+
+ CloudBackendService(CloudStorageEngine& engine, ExecEnv* exec_env);
+
+ ~CloudBackendService() override;
+
+ // TODO(plat1ko): cloud backend functions
+
+private:
+ [[maybe_unused]] CloudStorageEngine& _engine;
+};
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_internal_service.cpp
b/be/src/cloud/cloud_internal_service.cpp
new file mode 100644
index 00000000000..0ebbe1c20c6
--- /dev/null
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_internal_service.h"
+
+namespace doris {
+
+CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine,
ExecEnv* exec_env)
+ : PInternalService(exec_env), _engine(engine) {}
+
+CloudInternalServiceImpl::~CloudInternalServiceImpl() = default;
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_internal_service.h
b/be/src/cloud/cloud_internal_service.h
new file mode 100644
index 00000000000..6399a8923fa
--- /dev/null
+++ b/be/src/cloud/cloud_internal_service.h
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "service/internal_service.h"
+
+namespace doris {
+
+class CloudStorageEngine;
+
+class CloudInternalServiceImpl final : public PInternalService {
+public:
+ CloudInternalServiceImpl(CloudStorageEngine& engine, ExecEnv* exec_env);
+
+ ~CloudInternalServiceImpl() override;
+
+ // TODO(plat1ko): cloud internal service functions
+
+private:
+ [[maybe_unused]] CloudStorageEngine& _engine;
+};
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_tablet_mgr.cpp
b/be/src/cloud/cloud_tablet_mgr.cpp
index c6a794d02a6..daffc458583 100644
--- a/be/src/cloud/cloud_tablet_mgr.cpp
+++ b/be/src/cloud/cloud_tablet_mgr.cpp
@@ -185,7 +185,8 @@ Result<std::shared_ptr<CloudTablet>>
CloudTabletMgr::get_tablet(int64_t tablet_i
delete value1;
};
- auto* handle = cache->insert(key, value.release(), 1, deleter);
+ auto* handle = cache->insert(key, value.release(), 1, deleter,
CachePriority::NORMAL,
+ sizeof(CloudTablet));
auto ret = std::shared_ptr<CloudTablet>(
tablet.get(), [cache, handle](...) {
cache->release(handle); });
_tablet_map->put(std::move(tablet));
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 2d131afa870..20117a89496 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -29,6 +29,7 @@
#include <set>
#include <utility>
+#include "cloud/config.h"
#include "common/config.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
@@ -644,12 +645,16 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb)
{
break;
}
- for (auto& rs : _rs_metas) {
- rs->to_rowset_pb(tablet_meta_pb->add_rs_metas());
- }
- for (auto rs : _stale_rs_metas) {
- rs->to_rowset_pb(tablet_meta_pb->add_stale_rs_metas());
+ // RowsetMetaPB is separated from TabletMetaPB
+ if (!config::is_cloud_mode()) {
+ for (auto& rs : _rs_metas) {
+ rs->to_rowset_pb(tablet_meta_pb->add_rs_metas());
+ }
+ for (auto rs : _stale_rs_metas) {
+ rs->to_rowset_pb(tablet_meta_pb->add_stale_rs_metas());
+ }
}
+
_schema->to_schema_pb(tablet_meta_pb->mutable_schema());
tablet_meta_pb->set_in_restore_mode(in_restore_mode());
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 688d17dfa1e..9942262f9c9 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -261,8 +261,10 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
options.broken_paths = broken_paths;
options.backend_uid = doris::UniqueId::gen_uid();
if (config::is_cloud_mode()) {
+ std::cout << "start BE in cloud mode" << std::endl;
_storage_engine =
std::make_unique<CloudStorageEngine>(options.backend_uid);
} else {
+ std::cout << "start BE in local mode" << std::endl;
_storage_engine = std::make_unique<StorageEngine>(options);
}
auto st = _storage_engine->open();
diff --git a/be/src/runtime/load_stream_mgr.cpp
b/be/src/runtime/load_stream_mgr.cpp
index 8d9d37c5d3a..7d9de12d078 100644
--- a/be/src/runtime/load_stream_mgr.cpp
+++ b/be/src/runtime/load_stream_mgr.cpp
@@ -52,7 +52,7 @@ Status LoadStreamMgr::open_load_stream(const
POpenLoadStreamRequest* request,
UniqueId load_id(request->load_id());
{
- std::lock_guard<decltype(_lock)> l(_lock);
+ std::lock_guard l(_lock);
auto it = _load_streams_map.find(load_id);
if (it != _load_streams_map.end()) {
load_stream = it->second;
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index 6d8435d230a..50d28fd91ce 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -53,6 +53,7 @@
#include "olap/rowset/pending_rowset_helper.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_meta.h"
+#include "olap/snapshot_manager.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
@@ -360,26 +361,20 @@ void _ingest_binlog(StorageEngine& engine,
IngestBinlogArg* arg) {
}
} // namespace
-using apache::thrift::TException;
-using apache::thrift::TProcessor;
-using apache::thrift::TMultiplexedProcessor;
-using apache::thrift::transport::TTransportException;
-using apache::thrift::concurrency::ThreadFactory;
-
BaseBackendService::BaseBackendService(ExecEnv* exec_env)
: _exec_env(exec_env), _agent_server(new AgentServer(exec_env,
*exec_env->master_info())) {}
+BaseBackendService::~BaseBackendService() = default;
+
BackendService::BackendService(StorageEngine& engine, ExecEnv* exec_env)
: BaseBackendService(exec_env), _engine(engine) {}
-Status BaseBackendService::create_service(ExecEnv* exec_env, int port,
- std::unique_ptr<ThriftServer>*
server) {
- if (config::is_cloud_mode()) {
- // TODO(plat1ko): cloud mode
- return Status::NotSupported("Currently only support local storage
engine");
- }
- auto service =
-
std::make_shared<BackendService>(exec_env->storage_engine().to_local(),
exec_env);
+BackendService::~BackendService() = default;
+
+Status BackendService::create_service(StorageEngine& engine, ExecEnv*
exec_env, int port,
+ std::unique_ptr<ThriftServer>* server) {
+ auto service = std::make_shared<BackendService>(engine, exec_env);
+ service->_agent_server->start_workers(engine, exec_env);
// TODO: do we want a BoostThreadFactory?
// TODO: we want separate thread factories here, so that fe requests can't
starve
// be requests
@@ -695,12 +690,37 @@ void
BackendService::check_storage_format(TCheckStorageFormatResult& result) {
void BackendService::make_snapshot(TAgentResult& return_value,
const TSnapshotRequest& snapshot_request) {
- _agent_server->make_snapshot(_engine, return_value, snapshot_request);
+ std::string snapshot_path;
+ bool allow_incremental_clone = false;
+ Status status = _engine.snapshot_mgr()->make_snapshot(snapshot_request,
&snapshot_path,
+
&allow_incremental_clone);
+ if (!status) {
+ LOG_WARNING("failed to make snapshot")
+ .tag("tablet_id", snapshot_request.tablet_id)
+ .tag("schema_hash", snapshot_request.schema_hash)
+ .error(status);
+ } else {
+ LOG_INFO("successfully make snapshot")
+ .tag("tablet_id", snapshot_request.tablet_id)
+ .tag("schema_hash", snapshot_request.schema_hash)
+ .tag("snapshot_path", snapshot_path);
+ return_value.__set_snapshot_path(snapshot_path);
+ return_value.__set_allow_incremental_clone(allow_incremental_clone);
+ }
+
+ status.to_thrift(&return_value.status);
+
return_value.__set_snapshot_version(snapshot_request.preferred_snapshot_version);
}
void BackendService::release_snapshot(TAgentResult& return_value,
const std::string& snapshot_path) {
- _agent_server->release_snapshot(_engine, return_value, snapshot_path);
+ Status status = _engine.snapshot_mgr()->release_snapshot(snapshot_path);
+ if (!status) {
+ LOG_WARNING("failed to release snapshot").tag("snapshot_path",
snapshot_path).error(status);
+ } else {
+ LOG_INFO("successfully release snapshot").tag("snapshot_path",
snapshot_path);
+ }
+ status.to_thrift(&return_value.status);
}
void BackendService::ingest_binlog(TIngestBinlogResult& result,
@@ -903,29 +923,84 @@ void
BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,
}
}
+void BaseBackendService::get_tablet_stat(TTabletStatResult& result) {
+ LOG(ERROR) << "get_tablet_stat is not implemented";
+}
+
+int64_t BaseBackendService::get_trash_used_capacity() {
+ LOG(ERROR) << "get_trash_used_capacity is not implemented";
+ return 0;
+}
+
+void BaseBackendService::get_stream_load_record(TStreamLoadRecordResult&
result,
+ int64_t
last_stream_record_time) {
+ LOG(ERROR) << "get_stream_load_record is not implemented";
+}
+
+void
BaseBackendService::get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>&
diskTrashInfos) {
+ LOG(ERROR) << "get_disk_trash_used_capacity is not implemented";
+}
+
+void BaseBackendService::clean_trash() {
+ LOG(ERROR) << "clean_trash is not implemented";
+}
+
+void BaseBackendService::make_snapshot(TAgentResult& return_value,
+ const TSnapshotRequest&
snapshot_request) {
+ LOG(ERROR) << "make_snapshot is not implemented";
+ return_value.__set_status(Status::NotSupported("make_snapshot is not
implemented").to_thrift());
+}
+
+void BaseBackendService::release_snapshot(TAgentResult& return_value,
+ const std::string& snapshot_path) {
+ LOG(ERROR) << "release_snapshot is not implemented";
+ return_value.__set_status(
+ Status::NotSupported("release_snapshot is not
implemented").to_thrift());
+}
+
+void BaseBackendService::check_storage_format(TCheckStorageFormatResult&
result) {
+ LOG(ERROR) << "check_storage_format is not implemented";
+}
+
+void BaseBackendService::ingest_binlog(TIngestBinlogResult& result,
+ const TIngestBinlogRequest& request) {
+ LOG(ERROR) << "ingest_binlog is not implemented";
+ result.__set_status(Status::NotSupported("ingest_binlog is not
implemented").to_thrift());
+}
+
+void BaseBackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,
+ const TQueryIngestBinlogRequest&
request) {
+ LOG(ERROR) << "query_ingest_binlog is not implemented";
+ result.__set_status(TIngestBinlogStatus::UNKNOWN);
+ result.__set_err_msg("query_ingest_binlog is not implemented");
+}
+
void BaseBackendService::pre_cache_async(TPreCacheAsyncResponse& response,
const TPreCacheAsyncRequest& request)
{
- LOG(FATAL) << "BackendService is not implemented";
+ LOG(ERROR) << "pre_cache_async is not implemented";
+ response.__set_status(Status::NotSupported("pre_cache_async is not
implemented").to_thrift());
}
void BaseBackendService::check_pre_cache(TCheckPreCacheResponse& response,
const TCheckPreCacheRequest& request)
{
- LOG(FATAL) << "BackendService is not implemented";
+ LOG(ERROR) << "check_pre_cache is not implemented";
+ response.__set_status(Status::NotSupported("check_pre_cache is not
implemented").to_thrift());
}
void BaseBackendService::sync_load_for_tablets(TSyncLoadForTabletsResponse&
response,
const
TSyncLoadForTabletsRequest& request) {
- LOG(FATAL) << "BackendService is not implemented";
+ LOG(ERROR) << "sync_load_for_tablets is not implemented";
}
void
BaseBackendService::get_top_n_hot_partitions(TGetTopNHotPartitionsResponse&
response,
const
TGetTopNHotPartitionsRequest& request) {
- LOG(FATAL) << "BackendService is not implemented";
+ LOG(ERROR) << "get_top_n_hot_partitions is not implemented";
}
void BaseBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
const TWarmUpTabletsRequest& request)
{
- LOG(FATAL) << "BackendService is not implemented";
+ LOG(ERROR) << "warm_up_tablets is not implemented";
+ response.__set_status(Status::NotSupported("warm_up_tablets is not
implemented").to_thrift());
}
} // namespace doris
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index 01c1a2abf00..3aaee529735 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -18,7 +18,6 @@
#pragma once
#include <gen_cpp/BackendService.h>
-#include <stdint.h>
#include <memory>
#include <string>
@@ -68,11 +67,7 @@ class BaseBackendService : public BackendServiceIf {
public:
BaseBackendService(ExecEnv* exec_env);
- ~BaseBackendService() override = default;
-
- // NOTE: now we do not support multiple backend in one process
- static Status create_service(ExecEnv* exec_env, int port,
- std::unique_ptr<ThriftServer>* server);
+ ~BaseBackendService() override;
// Agent service
void submit_tasks(TAgentResult& return_value,
@@ -116,8 +111,32 @@ public:
// used for external service, close some context and release resource
related with this context
void close_scanner(TScanCloseResult& result_, const TScanCloseParams&
params) override;
- // TODO(AlexYue): The below cloud backend functions should be implemented
in
- // CloudBackendService
+
////////////////////////////////////////////////////////////////////////////
+ // begin local backend functions
+
////////////////////////////////////////////////////////////////////////////
+ void get_tablet_stat(TTabletStatResult& result) override;
+
+ int64_t get_trash_used_capacity() override;
+
+ void get_stream_load_record(TStreamLoadRecordResult& result,
+ int64_t last_stream_record_time) override;
+
+ void get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>&
diskTrashInfos) override;
+
+ void clean_trash() override;
+
+ void make_snapshot(TAgentResult& return_value,
+ const TSnapshotRequest& snapshot_request) override;
+
+ void release_snapshot(TAgentResult& return_value, const std::string&
snapshot_path) override;
+
+ void check_storage_format(TCheckStorageFormatResult& result) override;
+
+ void ingest_binlog(TIngestBinlogResult& result, const
TIngestBinlogRequest& request) override;
+
+ void query_ingest_binlog(TQueryIngestBinlogResult& result,
+ const TQueryIngestBinlogRequest& request)
override;
+
////////////////////////////////////////////////////////////////////////////
// begin cloud backend functions
////////////////////////////////////////////////////////////////////////////
@@ -137,9 +156,6 @@ public:
void warm_up_tablets(TWarmUpTabletsResponse& response,
const TWarmUpTabletsRequest& request) override;
-
////////////////////////////////////////////////////////////////////////////
- // end cloud backend functions
-
////////////////////////////////////////////////////////////////////////////
protected:
Status start_plan_fragment_execution(const TExecPlanFragmentParams&
exec_params);
@@ -151,9 +167,13 @@ protected:
// `StorageEngine` mixin for `BaseBackendService`
class BackendService final : public BaseBackendService {
public:
+ // NOTE: now we do not support multiple backend in one process
+ static Status create_service(StorageEngine& engine, ExecEnv* exec_env, int
port,
+ std::unique_ptr<ThriftServer>* server);
+
BackendService(StorageEngine& engine, ExecEnv* exec_env);
- ~BackendService() override = default;
+ ~BackendService() override;
void get_tablet_stat(TTabletStatResult& result) override;
diff --git a/be/src/service/brpc_service.cpp b/be/src/service/brpc_service.cpp
index 8bd147800a5..534c7b7b0c6 100644
--- a/be/src/service/brpc_service.cpp
+++ b/be/src/service/brpc_service.cpp
@@ -27,6 +27,7 @@
#include <ostream>
+#include "cloud/cloud_internal_service.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
@@ -59,13 +60,16 @@ BRpcService::~BRpcService() {
}
Status BRpcService::start(int port, int num_threads) {
+ // Add service
if (config::is_cloud_mode()) {
- // TODO(plat1ko): cloud mode
- return Status::NotSupported("Currently only support local storage
engine");
+ _server->AddService(
+ new
CloudInternalServiceImpl(_exec_env->storage_engine().to_cloud(), _exec_env),
+ brpc::SERVER_OWNS_SERVICE);
+ } else {
+ _server->AddService(
+ new
PInternalServiceImpl(_exec_env->storage_engine().to_local(), _exec_env),
+ brpc::SERVER_OWNS_SERVICE);
}
- // Add service
- _server->AddService(new
PInternalServiceImpl(_exec_env->storage_engine().to_local(), _exec_env),
- brpc::SERVER_OWNS_SERVICE);
// start service
brpc::ServerOptions options;
if (num_threads != -1) {
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index f013b83e68d..1ecf1a7a0d4 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -39,6 +39,8 @@
#include <tuple>
#include <vector>
+#include "cloud/cloud_backend_service.h"
+#include "cloud/config.h"
#include "common/stack_trace.h"
#include "olap/tablet_schema_cache.h"
#include "olap/utils.h"
@@ -483,7 +485,7 @@ int main(int argc, char** argv) {
doris::ThreadLocalHandle::create_thread_local_if_not_exits();
// init exec env
- auto exec_env(doris::ExecEnv::GetInstance());
+ auto* exec_env(doris::ExecEnv::GetInstance());
status = doris::ExecEnv::init(doris::ExecEnv::GetInstance(), paths,
broken_paths);
if (status != Status::OK()) {
LOG(ERROR) << "failed to init doris storage engine, res=" << status;
@@ -494,8 +496,17 @@ int main(int argc, char** argv) {
doris::ThriftRpcHelper::setup(exec_env);
// 1. thrift server with be_port
std::unique_ptr<doris::ThriftServer> be_server;
- EXIT_IF_ERROR(
- doris::BackendService::create_service(exec_env,
doris::config::be_port, &be_server));
+
+ if (doris::config::is_cloud_mode()) {
+ EXIT_IF_ERROR(doris::CloudBackendService::create_service(
+ exec_env->storage_engine().to_cloud(), exec_env,
doris::config::be_port,
+ &be_server));
+ } else {
+
EXIT_IF_ERROR(doris::BackendService::create_service(exec_env->storage_engine().to_local(),
+ exec_env,
doris::config::be_port,
+ &be_server));
+ }
+
status = be_server->start();
if (!status.ok()) {
LOG(ERROR) << "Doris Be server did not start correctly, exiting";
diff --git a/be/test/agent/task_worker_pool_test.cpp
b/be/test/agent/task_worker_pool_test.cpp
index 5fbe652dda9..0c7fd88396b 100644
--- a/be/test/agent/task_worker_pool_test.cpp
+++ b/be/test/agent/task_worker_pool_test.cpp
@@ -41,14 +41,14 @@ TEST(TaskWorkerPoolTest, TaskWorkerPool) {
TAgentTaskRequest task;
task.__set_signature(-1);
- workers.submit_task(task);
- workers.submit_task(task);
- workers.submit_task(task); // Pending and ignored when stop
+ auto _ = workers.submit_task(task);
+ _ = workers.submit_task(task);
+ _ = workers.submit_task(task); // Pending and ignored when stop
std::this_thread::sleep_for(200ms);
workers.stop();
- workers.submit_task(task); // Ignore
+ _ = workers.submit_task(task); // Ignore
EXPECT_EQ(count.load(), 2);
}
@@ -69,13 +69,13 @@ TEST(TaskWorkerPoolTest, PriorTaskWorkerPool) {
TAgentTaskRequest task;
task.__set_signature(-1);
task.__set_priority(TPriority::NORMAL);
- workers.submit_task(task);
- workers.submit_task(task);
+ auto _ = workers.submit_task(task);
+ _ = workers.submit_task(task);
std::this_thread::sleep_for(200ms);
task.__set_priority(TPriority::HIGH);
// Normal pool is busy, but high prior pool should be idle
- workers.submit_task(task);
+ _ = workers.submit_task(task);
std::this_thread::sleep_for(500ms);
EXPECT_EQ(normal_count.load(), 0);
EXPECT_EQ(high_prior_count.load(), 1);
@@ -84,8 +84,8 @@ TEST(TaskWorkerPoolTest, PriorTaskWorkerPool) {
EXPECT_EQ(normal_count.load(), 2);
EXPECT_EQ(high_prior_count.load(), 1);
// Both normal and high prior pool are idle
- workers.submit_task(task);
- workers.submit_task(task);
+ _ = workers.submit_task(task);
+ _ = workers.submit_task(task);
std::this_thread::sleep_for(500ms);
EXPECT_EQ(normal_count.load(), 2);
@@ -96,7 +96,7 @@ TEST(TaskWorkerPoolTest, PriorTaskWorkerPool) {
EXPECT_EQ(normal_count.load(), 2);
EXPECT_EQ(high_prior_count.load(), 3);
- workers.submit_task(task); // Ignore
+ _ = workers.submit_task(task); // Ignore
EXPECT_EQ(normal_count.load(), 2);
EXPECT_EQ(high_prior_count.load(), 3);
@@ -110,7 +110,7 @@ TEST(TaskWorkerPoolTest, ReportWorkerPool) {
std::atomic_int count {0};
ReportWorker worker("test", master_info, 1, [&] { ++count; });
- worker.notify(); // Not received heartbeat yet, igonre
+ worker.notify(); // Not received heartbeat yet, ignore
std::this_thread::sleep_for(100ms);
master_info.network_address.__set_port(9030);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]