This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 98be2231847 [feature](merge-cloud) Implement cloud mixin service 
(#30605)
98be2231847 is described below

commit 98be223184724f9e7491fb59c080a437632ec522
Author: walter <[email protected]>
AuthorDate: Wed Jan 31 19:14:03 2024 +0800

    [feature](merge-cloud) Implement cloud mixin service (#30605)
    
    Co-authored-by: platoneko <[email protected]>
---
 be/src/agent/agent_server.cpp           | 276 +++++++++++---------------------
 be/src/agent/agent_server.h             |  52 ++----
 be/src/agent/task_worker_pool.cpp       |  20 ++-
 be/src/agent/task_worker_pool.h         |  20 ++-
 be/src/cloud/cloud_backend_service.cpp  |  48 ++++++
 be/src/cloud/cloud_backend_service.h    |  41 +++++
 be/src/cloud/cloud_internal_service.cpp |  27 ++++
 be/src/cloud/cloud_internal_service.h   |  38 +++++
 be/src/cloud/cloud_tablet_mgr.cpp       |   3 +-
 be/src/olap/tablet_meta.cpp             |  15 +-
 be/src/runtime/exec_env_init.cpp        |   2 +
 be/src/runtime/load_stream_mgr.cpp      |   2 +-
 be/src/service/backend_service.cpp      | 117 +++++++++++---
 be/src/service/backend_service.h        |  44 +++--
 be/src/service/brpc_service.cpp         |  14 +-
 be/src/service/doris_main.cpp           |  17 +-
 be/test/agent/task_worker_pool_test.cpp |  22 +--
 17 files changed, 467 insertions(+), 291 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 7b77105a4aa..d2a6e605b88 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -24,6 +24,7 @@
 #include <thrift/protocol/TDebugProtocol.h>
 
 #include <filesystem>
+#include <memory>
 #include <ostream>
 #include <string>
 
@@ -49,24 +50,8 @@ namespace doris {
 
 AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info)
         : _master_info(master_info), _topic_subscriber(new TopicSubscriber()) {
-    for (const auto& path : exec_env->store_paths()) {
-        try {
-            string dpp_download_path_str = path.path + "/" + DPP_PREFIX;
-            std::filesystem::path dpp_download_path(dpp_download_path_str);
-            if (std::filesystem::exists(dpp_download_path)) {
-                std::filesystem::remove_all(dpp_download_path);
-            }
-        } catch (...) {
-            LOG(WARNING) << "boost exception when remove dpp download path. 
path=" << path.path;
-        }
-    }
-
     MasterServerClient::create(master_info);
 
-#ifndef BE_TEST
-    start_workers(exec_env);
-#endif
-
 #if !defined(BE_TEST) && !defined(__APPLE__)
     // Add subscriber here and register listeners
     std::unique_ptr<TopicListener> wg_listener = 
std::make_unique<WorkloadGroupListener>(exec_env);
@@ -85,84 +70,135 @@ AgentServer::AgentServer(ExecEnv* exec_env, const 
TMasterInfo& master_info)
 
 AgentServer::~AgentServer() = default;
 
-void AgentServer::start_workers(ExecEnv* exec_env) {
-    // TODO(plat1ko): CloudStorageEngine
-    auto& engine = ExecEnv::GetInstance()->storage_engine().to_local();
+class PushTaskWorkerPool final : public TaskWorkerPoolIf {
+public:
+    PushTaskWorkerPool(StorageEngine& engine)
+            : _push_delete_workers(
+                      TaskWorkerPool("DELETE", config::delete_worker_count,
+                                     [&engine](auto&& task) { 
push_callback(engine, task); })),
+              _push_load_workers(PriorTaskWorkerPool(
+                      "PUSH", config::push_worker_count_normal_priority,
+                      config::push_worker_count_high_priority,
+                      [&engine](auto&& task) { push_callback(engine, task); 
})) {}
+
+    ~PushTaskWorkerPool() override { stop(); }
+
+    void stop() {
+        _push_delete_workers.stop();
+        _push_load_workers.stop();
+    }
+
+    Status submit_task(const TAgentTaskRequest& task) override {
+        if (task.push_req.push_type == TPushType::LOAD_V2) {
+            return _push_load_workers.submit_task(task);
+        } else if (task.push_req.push_type == TPushType::DELETE) {
+            return _push_delete_workers.submit_task(task);
+        } else {
+            return Status::InvalidArgument(
+                    "task(signature={}, type={}, push_type={}) has wrong 
push_type", task.signature,
+                    task.task_type, task.push_req.push_type);
+        }
+    }
+
+private:
+    TaskWorkerPool _push_delete_workers;
+    PriorTaskWorkerPool _push_load_workers;
+};
+
+void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) {
+    for (const auto& path : exec_env->store_paths()) {
+        try {
+            string dpp_download_path_str = path.path + "/" + DPP_PREFIX;
+            std::filesystem::path dpp_download_path(dpp_download_path_str);
+            if (std::filesystem::exists(dpp_download_path)) {
+                std::filesystem::remove_all(dpp_download_path);
+            }
+        } catch (...) {
+            LOG(WARNING) << "boost exception when remove dpp download path. 
path=" << path.path;
+        }
+    }
+
     // clang-format off
-    _alter_inverted_index_workers = std::make_unique<TaskWorkerPool>(
-            "ALTER_INVERTED_INDEX", config::alter_index_worker_count, 
[&engine](auto&& task) { return alter_inverted_index_callback(engine, task); });
+    _workers[TTaskType::ALTER_INVERTED_INDEX] = 
std::make_unique<TaskWorkerPool>(
+        "ALTER_INVERTED_INDEX", config::alter_index_worker_count, 
[&engine](auto&& task) { return alter_inverted_index_callback(engine, task); });
 
-    _check_consistency_workers = std::make_unique<TaskWorkerPool>(
-            "CHECK_CONSISTENCY", config::check_consistency_worker_count, 
[&engine](auto&& task) { return check_consistency_callback(engine, task); });
+    _workers[TTaskType::CHECK_CONSISTENCY] = std::make_unique<TaskWorkerPool>(
+        "CHECK_CONSISTENCY", config::check_consistency_worker_count, 
[&engine](auto&& task) { return check_consistency_callback(engine, task); });
 
-    _upload_workers = std::make_unique<TaskWorkerPool>(
+    _workers[TTaskType::UPLOAD] = std::make_unique<TaskWorkerPool>(
             "UPLOAD", config::upload_worker_count, [&engine, exec_env](auto&& 
task) { return upload_callback(engine, exec_env, task); });
 
-    _download_workers = std::make_unique<TaskWorkerPool>(
+    _workers[TTaskType::DOWNLOAD] = std::make_unique<TaskWorkerPool>(
             "DOWNLOAD", config::download_worker_count, [&engine, 
exec_env](auto&& task) { return download_callback(engine, exec_env, task); });
 
-    _make_snapshot_workers = std::make_unique<TaskWorkerPool>(
+    _workers[TTaskType::MAKE_SNAPSHOT] = std::make_unique<TaskWorkerPool>(
             "MAKE_SNAPSHOT", config::make_snapshot_worker_count, 
[&engine](auto&& task) { return make_snapshot_callback(engine, task); });
 
-    _release_snapshot_workers = std::make_unique<TaskWorkerPool>(
+    _workers[TTaskType::RELEASE_SNAPSHOT] = std::make_unique<TaskWorkerPool>(
             "RELEASE_SNAPSHOT", config::release_snapshot_worker_count, 
[&engine](auto&& task) { return release_snapshot_callback(engine, task); });
 
-    _move_dir_workers = std::make_unique<TaskWorkerPool>(
+    _workers[TTaskType::MOVE] = std::make_unique<TaskWorkerPool>(
             "MOVE", 1, [&engine, exec_env](auto&& task) { return 
move_dir_callback(engine, exec_env, task); });
 
-    _submit_table_compaction_workers = std::make_unique<TaskWorkerPool>(
+    _workers[TTaskType::COMPACTION] = std::make_unique<TaskWorkerPool>(
             "SUBMIT_TABLE_COMPACTION", 1, [&engine](auto&& task) { return 
submit_table_compaction_callback(engine, task); });
 
-    _push_storage_policy_workers = std::make_unique<TaskWorkerPool>(
+    _workers[TTaskType::PUSH_STORAGE_POLICY] = 
std::make_unique<TaskWorkerPool>(
             "PUSH_STORAGE_POLICY", 1, [&engine](auto&& task) { return 
push_storage_policy_callback(engine, task); });
 
-    _push_cooldown_conf_workers = std::make_unique<TaskWorkerPool>(
+    _workers[TTaskType::PUSH_COOLDOWN_CONF] = std::make_unique<TaskWorkerPool>(
             "PUSH_COOLDOWN_CONF", 1, [&engine](auto&& task) { return 
push_cooldown_conf_callback(engine, task); });
 
-    _create_tablet_workers = std::make_unique<TaskWorkerPool>(
+    _workers[TTaskType::CREATE] = std::make_unique<TaskWorkerPool>(
             "CREATE_TABLE", config::create_tablet_worker_count, 
[&engine](auto&& task) { return create_tablet_callback(engine, task); });
 
-    _drop_tablet_workers = std::make_unique<TaskWorkerPool>(
+    _workers[TTaskType::DROP] = std::make_unique<TaskWorkerPool>(
             "DROP_TABLE", config::drop_tablet_worker_count, [&engine](auto&& 
task) { return drop_tablet_callback(engine, task); });
 
-    _publish_version_workers = 
std::make_unique<PublishVersionWorkerPool>(engine);
+    _workers[TTaskType::PUBLISH_VERSION] = 
std::make_unique<PublishVersionWorkerPool>(engine);
 
-    _clear_transaction_task_workers = std::make_unique<TaskWorkerPool>(
+    _workers[TTaskType::CLEAR_TRANSACTION_TASK] = 
std::make_unique<TaskWorkerPool>(
             "CLEAR_TRANSACTION_TASK", 
config::clear_transaction_task_worker_count, [&engine](auto&& task) { return 
clear_transaction_task_callback(engine, task); });
 
-    _push_delete_workers = std::make_unique<TaskWorkerPool>(
-            "DELETE", config::delete_worker_count, [&engine](auto&& task) { 
push_callback(engine, task); });
-
-    // Both PUSH and REALTIME_PUSH type use push_callback
-    _push_load_workers = std::make_unique<PriorTaskWorkerPool>(
-            "PUSH", config::push_worker_count_normal_priority, 
config::push_worker_count_high_priority, [&engine](auto&& task) { 
push_callback(engine, task); });
+    _workers[TTaskType::PUSH] = std::make_unique<PushTaskWorkerPool>(engine);
 
-    _update_tablet_meta_info_workers = std::make_unique<TaskWorkerPool>(
+    _workers[TTaskType::UPDATE_TABLET_META_INFO] = 
std::make_unique<TaskWorkerPool>(
             "UPDATE_TABLET_META_INFO", 1, [&engine](auto&& task) { return 
update_tablet_meta_callback(engine, task); });
 
-    _alter_tablet_workers = std::make_unique<TaskWorkerPool>(
+    _workers[TTaskType::ALTER] = std::make_unique<TaskWorkerPool>(
             "ALTER_TABLE", config::alter_tablet_worker_count, [&engine](auto&& 
task) { return alter_tablet_callback(engine, task); });
 
-    _clone_workers = std::make_unique<TaskWorkerPool>(
+    _workers[TTaskType::CLONE] = std::make_unique<TaskWorkerPool>(
             "CLONE", config::clone_worker_count, [&engine, &master_info = 
_master_info](auto&& task) { return clone_callback(engine, master_info, task); 
});
 
-    _storage_medium_migrate_workers = std::make_unique<TaskWorkerPool>(
+    _workers[TTaskType::STORAGE_MEDIUM_MIGRATE] = 
std::make_unique<TaskWorkerPool>(
             "STORAGE_MEDIUM_MIGRATE", config::storage_medium_migrate_count, 
[&engine](auto&& task) { return storage_medium_migrate_callback(engine, task); 
});
 
-    _gc_binlog_workers = std::make_unique<TaskWorkerPool>(
+    _workers[TTaskType::GC_BINLOG] = std::make_unique<TaskWorkerPool>(
             "GC_BINLOG", 1, [&engine](auto&& task) { return 
gc_binlog_callback(engine, task); });
 
-    _report_task_workers = std::make_unique<ReportWorker>(
-            "REPORT_TASK", _master_info, config::report_task_interval_seconds, 
[&master_info = _master_info] { report_task_callback(master_info); });
+    _report_workers.push_back(std::make_unique<ReportWorker>(
+            "REPORT_TASK", _master_info, config::report_task_interval_seconds, 
[&master_info = _master_info] { report_task_callback(master_info); }));
 
-    _report_disk_state_workers = std::make_unique<ReportWorker>(
-            "REPORT_DISK_STATE", _master_info, 
config::report_disk_state_interval_seconds, [&engine, &master_info = 
_master_info] { report_disk_callback(engine, master_info); });
+    _report_workers.push_back(std::make_unique<ReportWorker>(
+            "REPORT_DISK_STATE", _master_info, 
config::report_disk_state_interval_seconds, [&engine, &master_info = 
_master_info] { report_disk_callback(engine, master_info); }));
 
-    _report_tablet_workers = std::make_unique<ReportWorker>(
-            "REPORT_OLAP_TABLE", _master_info, 
config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] { 
report_tablet_callback(engine, master_info); });
+    _report_workers.push_back(std::make_unique<ReportWorker>(
+            "REPORT_OLAP_TABLE", _master_info, 
config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] { 
report_tablet_callback(engine, master_info); }));
     // clang-format on
 }
 
+void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* 
exec_env) {
+    // TODO(plat1ko): DELETE worker
+    // TODO(plat1ko): ALTER worker
+    // TODO(plat1ko): SUBMIT_TABLE_COMPACTION worker
+    // TODO(plat1ko): CALCULATE_DELETE_BITMAP worker
+
+    _report_workers.push_back(std::make_unique<ReportWorker>(
+            "REPORT_TASK", _master_info, config::report_task_interval_seconds,
+            [&master_info = _master_info] { report_task_callback(master_info); 
}));
+}
+
 // TODO(lingbin): each task in the batch may have it own status or FE must 
check and
 // resend request when something is wrong(BE may need some logic to guarantee 
idempotence.
 void AgentServer::submit_tasks(TAgentResult& agent_result,
@@ -178,101 +214,18 @@ void AgentServer::submit_tasks(TAgentResult& 
agent_result,
 
     for (auto&& task : tasks) {
         VLOG_RPC << "submit one task: " << 
apache::thrift::ThriftDebugString(task).c_str();
-        TTaskType::type task_type = task.task_type;
+        auto task_type = task.task_type;
+        if (task_type == TTaskType::REALTIME_PUSH) {
+            task_type = TTaskType::PUSH;
+        }
         int64_t signature = task.signature;
-
-#define HANDLE_TYPE(t_task_type, work_pool, req_member)                        
                  \
-    case t_task_type:                                                          
                  \
-        if (task.__isset.req_member) {                                         
                  \
-            work_pool->submit_task(task);                                      
                  \
-        } else {                                                               
                  \
-            ret_st = Status::InvalidArgument("task(signature={}) has wrong 
request member = {}", \
-                                             signature, #req_member);          
                  \
-        }                                                                      
                  \
-        break;
-
-        // TODO(lingbin): It still too long, divided these task types into 
several categories
-        switch (task_type) {
-            HANDLE_TYPE(TTaskType::CREATE, _create_tablet_workers, 
create_tablet_req);
-            HANDLE_TYPE(TTaskType::DROP, _drop_tablet_workers, 
drop_tablet_req);
-            HANDLE_TYPE(TTaskType::PUBLISH_VERSION, _publish_version_workers, 
publish_version_req);
-            HANDLE_TYPE(TTaskType::CLEAR_TRANSACTION_TASK, 
_clear_transaction_task_workers,
-                        clear_transaction_task_req);
-            HANDLE_TYPE(TTaskType::CLONE, _clone_workers, clone_req);
-            HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE, 
_storage_medium_migrate_workers,
-                        storage_medium_migrate_req);
-            HANDLE_TYPE(TTaskType::CHECK_CONSISTENCY, 
_check_consistency_workers,
-                        check_consistency_req);
-            HANDLE_TYPE(TTaskType::UPLOAD, _upload_workers, upload_req);
-            HANDLE_TYPE(TTaskType::DOWNLOAD, _download_workers, download_req);
-            HANDLE_TYPE(TTaskType::MAKE_SNAPSHOT, _make_snapshot_workers, 
snapshot_req);
-            HANDLE_TYPE(TTaskType::RELEASE_SNAPSHOT, _release_snapshot_workers,
-                        release_snapshot_req);
-            HANDLE_TYPE(TTaskType::MOVE, _move_dir_workers, move_dir_req);
-            HANDLE_TYPE(TTaskType::UPDATE_TABLET_META_INFO, 
_update_tablet_meta_info_workers,
-                        update_tablet_meta_info_req);
-            HANDLE_TYPE(TTaskType::COMPACTION, 
_submit_table_compaction_workers, compaction_req);
-            HANDLE_TYPE(TTaskType::PUSH_STORAGE_POLICY, 
_push_storage_policy_workers,
-                        push_storage_policy_req);
-
-        case TTaskType::REALTIME_PUSH:
-        case TTaskType::PUSH:
-            if (!task.__isset.push_req) {
-                ret_st = Status::InvalidArgument(
-                        "task(signature={}) has wrong request member = 
push_req", signature);
-                break;
-            }
-            if (task.push_req.push_type == TPushType::LOAD_V2) {
-                _push_load_workers->submit_task(task);
-            } else if (task.push_req.push_type == TPushType::DELETE) {
-                _push_delete_workers->submit_task(task);
-            } else {
-                ret_st = Status::InvalidArgument(
-                        "task(signature={}, type={}, push_type={}) has wrong 
push_type", signature,
-                        task_type, task.push_req.push_type);
-            }
-            break;
-        case TTaskType::ALTER:
-            if (task.__isset.alter_tablet_req || 
task.__isset.alter_tablet_req_v2) {
-                _alter_tablet_workers->submit_task(task);
-            } else {
-                ret_st = Status::InvalidArgument(
-                        "task(signature={}) has wrong request member = 
alter_tablet_req",
-                        signature);
-            }
-            break;
-        case TTaskType::ALTER_INVERTED_INDEX:
-            if (task.__isset.alter_inverted_index_req) {
-                _alter_inverted_index_workers->submit_task(task);
-            } else {
-                ret_st = Status::InvalidArgument(strings::Substitute(
-                        "task(signature=$0) has wrong request member = 
alter_inverted_index_req",
-                        signature));
-            }
-            break;
-        case TTaskType::PUSH_COOLDOWN_CONF:
-            if (task.__isset.push_cooldown_conf) {
-                _push_cooldown_conf_workers->submit_task(task);
-            } else {
-                ret_st = Status::InvalidArgument(
-                        "task(signature={}) has wrong request member = 
push_cooldown_conf",
-                        signature);
-            }
-            break;
-        case TTaskType::GC_BINLOG:
-            if (task.__isset.gc_binlog_req) {
-                _gc_binlog_workers->submit_task(task);
-            } else {
-                ret_st = Status::InvalidArgument(
-                        "task(signature={}) has wrong request member = 
gc_binlog_req", signature);
-            }
-            break;
-        default:
+        if (auto it = _workers.find(task_type); it != _workers.end()) {
+            auto& worker = it->second;
+            ret_st = worker->submit_task(task);
+        } else {
             ret_st = Status::InvalidArgument("task(signature={}, type={}) has 
wrong task type",
-                                             signature, task_type);
-            break;
+                                             signature, task.task_type);
         }
-#undef HANDLE_TYPE
 
         if (!ret_st.ok()) {
             LOG_WARNING("failed to submit task").tag("task", 
task).error(ret_st);
@@ -292,41 +245,6 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
     ret_st.to_thrift(&agent_result.status);
 }
 
-void AgentServer::make_snapshot(StorageEngine& engine, TAgentResult& 
t_agent_result,
-                                const TSnapshotRequest& snapshot_request) {
-    string snapshot_path;
-    bool allow_incremental_clone = false;
-    Status status = engine.snapshot_mgr()->make_snapshot(snapshot_request, 
&snapshot_path,
-                                                         
&allow_incremental_clone);
-    if (!status) {
-        LOG_WARNING("failed to make snapshot")
-                .tag("tablet_id", snapshot_request.tablet_id)
-                .tag("schema_hash", snapshot_request.schema_hash)
-                .error(status);
-    } else {
-        LOG_INFO("successfully make snapshot")
-                .tag("tablet_id", snapshot_request.tablet_id)
-                .tag("schema_hash", snapshot_request.schema_hash)
-                .tag("snapshot_path", snapshot_path);
-        t_agent_result.__set_snapshot_path(snapshot_path);
-        t_agent_result.__set_allow_incremental_clone(allow_incremental_clone);
-    }
-
-    status.to_thrift(&t_agent_result.status);
-    
t_agent_result.__set_snapshot_version(snapshot_request.preferred_snapshot_version);
-}
-
-void AgentServer::release_snapshot(StorageEngine& engine, TAgentResult& 
t_agent_result,
-                                   const std::string& snapshot_path) {
-    Status status = engine.snapshot_mgr()->release_snapshot(snapshot_path);
-    if (!status) {
-        LOG_WARNING("failed to release snapshot").tag("snapshot_path", 
snapshot_path).error(status);
-    } else {
-        LOG_INFO("successfully release snapshot").tag("snapshot_path", 
snapshot_path);
-    }
-    status.to_thrift(&t_agent_result.status);
-}
-
 void AgentServer::publish_cluster_state(TAgentResult& t_agent_result,
                                         const TAgentPublishRequest& request) {
     Status status = Status::NotSupported("deprecated 
method(publish_cluster_state) was invoked");
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index 6748b10d8b2..761bb9e4b67 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -21,10 +21,12 @@
 
 #include <memory>
 #include <string>
+#include <unordered_map>
 #include <vector>
 
 namespace doris {
 
+class TaskWorkerPoolIf;
 class TaskWorkerPool;
 class PriorTaskWorkerPool;
 class ReportWorker;
@@ -36,6 +38,7 @@ class TAgentTaskRequest;
 class TMasterInfo;
 class TSnapshotRequest;
 class StorageEngine;
+class CloudStorageEngine;
 
 // Each method corresponds to one RPC from FE Master, see BackendService.
 class AgentServer {
@@ -43,15 +46,13 @@ public:
     explicit AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info);
     ~AgentServer();
 
+    void start_workers(StorageEngine& engine, ExecEnv* exec_env);
+
+    void cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_env);
+
     // Receive agent task from FE master
     void submit_tasks(TAgentResult& agent_result, const 
std::vector<TAgentTaskRequest>& tasks);
 
-    // TODO(lingbin): make the agent_result to be a pointer, because it will 
be modified.
-    static void make_snapshot(StorageEngine& engine, TAgentResult& 
agent_result,
-                              const TSnapshotRequest& snapshot_request);
-    static void release_snapshot(StorageEngine& engine, TAgentResult& 
agent_result,
-                                 const std::string& snapshot_path);
-
     // Deprecated
     // TODO(lingbin): This method is deprecated, should be removed later.
     // [[deprecated]]
@@ -60,43 +61,16 @@ public:
     TopicSubscriber* get_topic_subscriber() { return _topic_subscriber.get(); }
 
 private:
-    void start_workers(ExecEnv* exec_env);
-
     // Reference to the ExecEnv::_master_info
     const TMasterInfo& _master_info;
 
-    std::unique_ptr<TaskWorkerPool> _create_tablet_workers;
-    std::unique_ptr<TaskWorkerPool> _drop_tablet_workers;
-    std::unique_ptr<PriorTaskWorkerPool> _push_load_workers;
-    std::unique_ptr<TaskWorkerPool> _publish_version_workers;
-    std::unique_ptr<TaskWorkerPool> _clear_transaction_task_workers;
-    std::unique_ptr<TaskWorkerPool> _push_delete_workers;
-    std::unique_ptr<TaskWorkerPool> _alter_tablet_workers;
-    std::unique_ptr<TaskWorkerPool> _alter_inverted_index_workers;
-    std::unique_ptr<TaskWorkerPool> _push_cooldown_conf_workers;
-    std::unique_ptr<TaskWorkerPool> _clone_workers;
-    std::unique_ptr<TaskWorkerPool> _storage_medium_migrate_workers;
-    std::unique_ptr<TaskWorkerPool> _check_consistency_workers;
-
-    // These 3 worker-pool do not accept tasks from FE.
-    // It is self triggered periodically and reports to Fe master
-    std::unique_ptr<ReportWorker> _report_task_workers;
-    std::unique_ptr<ReportWorker> _report_disk_state_workers;
-    std::unique_ptr<ReportWorker> _report_tablet_workers;
-
-    std::unique_ptr<TaskWorkerPool> _upload_workers;
-    std::unique_ptr<TaskWorkerPool> _download_workers;
-    std::unique_ptr<TaskWorkerPool> _make_snapshot_workers;
-    std::unique_ptr<TaskWorkerPool> _release_snapshot_workers;
-    std::unique_ptr<TaskWorkerPool> _move_dir_workers;
-    std::unique_ptr<TaskWorkerPool> _recover_tablet_workers;
-    std::unique_ptr<TaskWorkerPool> _update_tablet_meta_info_workers;
-
-    std::unique_ptr<TaskWorkerPool> _submit_table_compaction_workers;
-
-    std::unique_ptr<TaskWorkerPool> _push_storage_policy_workers;
+    std::unordered_map<int64_t /* TTaskType */, 
std::unique_ptr<TaskWorkerPoolIf>> _workers;
+
+    // These workers do not accept tasks from FE.
+    // It is self triggered periodically and reports to FE master
+    std::vector<std::unique_ptr<ReportWorker>> _report_workers;
+
     std::unique_ptr<TopicSubscriber> _topic_subscriber;
-    std::unique_ptr<TaskWorkerPool> _gc_binlog_workers;
 };
 
 } // end namespace doris
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index aa54a3d0e70..0c5ee012542 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -303,8 +303,8 @@ bool handle_report(const TReportRequest& request, const 
TMasterInfo& master_info
     return true;
 }
 
-void _submit_task(const TAgentTaskRequest& task,
-                  std::function<Status(const TAgentTaskRequest&)> submit_op) {
+Status _submit_task(const TAgentTaskRequest& task,
+                    std::function<Status(const TAgentTaskRequest&)> submit_op) 
{
     const TTaskType::type task_type = task.task_type;
     int64_t signature = task.signature;
 
@@ -314,18 +314,22 @@ void _submit_task(const TAgentTaskRequest& task,
 
     if (!register_task_info(task_type, signature)) {
         LOG_WARNING("failed to register task").tag("type", 
type_str).tag("signature", signature);
-        return;
+        // Duplicated task request, just return OK
+        return Status::OK();
     }
 
+    // TODO(plat1ko): check task request member
+
     // Set the receiving time of task so that we can determine whether it is 
timed out later
     (const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr));
     auto st = submit_op(task);
     if (!st.ok()) [[unlikely]] {
         LOG_INFO("failed to submit task").tag("type", 
type_str).tag("signature", signature);
-        return;
+        return st;
     }
 
     LOG_INFO("successfully submit task").tag("type", 
type_str).tag("signature", signature);
+    return Status::OK();
 }
 
 bvar::LatencyRecorder g_publish_version_latency("doris_pk", "publish_version");
@@ -426,8 +430,8 @@ void TaskWorkerPool::stop() {
     }
 }
 
-void TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
-    _submit_task(task, [this](auto&& task) {
+Status TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
+    return _submit_task(task, [this](auto&& task) {
         add_task_count(task, 1);
         return _thread_pool->submit_func([this, task]() {
             _callback(task);
@@ -484,8 +488,8 @@ void PriorTaskWorkerPool::stop() {
     }
 }
 
-void PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
-    _submit_task(task, [this](auto&& task) {
+Status PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
+    return _submit_task(task, [this](auto&& task) {
         auto req = std::make_unique<TAgentTaskRequest>(task);
         add_task_count(*req, 1);
         if (req->__isset.priority && req->priority == TPriority::HIGH) {
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 241bec98578..5d440377724 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -26,6 +26,7 @@
 #include <string>
 #include <string_view>
 
+#include "common/status.h"
 #include "gutil/ref_counted.h"
 
 namespace doris {
@@ -39,16 +40,23 @@ class TReportRequest;
 class TTabletInfo;
 class TAgentTaskRequest;
 
-class TaskWorkerPool {
+class TaskWorkerPoolIf {
+public:
+    virtual ~TaskWorkerPoolIf() = default;
+
+    virtual Status submit_task(const TAgentTaskRequest& task) = 0;
+};
+
+class TaskWorkerPool : public TaskWorkerPoolIf {
 public:
     TaskWorkerPool(std::string_view name, int worker_count,
                    std::function<void(const TAgentTaskRequest&)> callback);
 
-    virtual ~TaskWorkerPool();
+    ~TaskWorkerPool() override;
 
     void stop();
 
-    void submit_task(const TAgentTaskRequest& task);
+    Status submit_task(const TAgentTaskRequest& task) override;
 
 protected:
     std::atomic_bool _stopped {false};
@@ -68,16 +76,16 @@ private:
     StorageEngine& _engine;
 };
 
-class PriorTaskWorkerPool {
+class PriorTaskWorkerPool final : public TaskWorkerPoolIf {
 public:
     PriorTaskWorkerPool(std::string_view name, int normal_worker_count, int 
high_prior_worker_conut,
                         std::function<void(const TAgentTaskRequest& task)> 
callback);
 
-    ~PriorTaskWorkerPool();
+    ~PriorTaskWorkerPool() override;
 
     void stop();
 
-    void submit_task(const TAgentTaskRequest& task);
+    Status submit_task(const TAgentTaskRequest& task) override;
 
 private:
     void normal_loop();
diff --git a/be/src/cloud/cloud_backend_service.cpp 
b/be/src/cloud/cloud_backend_service.cpp
new file mode 100644
index 00000000000..23a75ffda76
--- /dev/null
+++ b/be/src/cloud/cloud_backend_service.cpp
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_backend_service.h"
+
+#include "common/config.h"
+#include "util/thrift_server.h"
+
+namespace doris {
+
+CloudBackendService::CloudBackendService(CloudStorageEngine& engine, ExecEnv* 
exec_env)
+        : BaseBackendService(exec_env), _engine(engine) {}
+
+CloudBackendService::~CloudBackendService() = default;
+
+Status CloudBackendService::create_service(CloudStorageEngine& engine, 
ExecEnv* exec_env, int port,
+                                           std::unique_ptr<ThriftServer>* 
server) {
+    auto service = std::make_shared<CloudBackendService>(engine, exec_env);
+    service->_agent_server->cloud_start_workers(engine, exec_env);
+    // TODO: do we want a BoostThreadFactory?
+    // TODO: we want separate thread factories here, so that fe requests can't 
starve
+    // be requests
+    // std::shared_ptr<TProcessor> be_processor = 
std::make_shared<BackendServiceProcessor>(service);
+    auto be_processor = std::make_shared<BackendServiceProcessor>(service);
+
+    *server = std::make_unique<ThriftServer>("backend", be_processor, port,
+                                             config::be_service_threads);
+
+    LOG(INFO) << "Doris CloudBackendService listening on " << port;
+
+    return Status::OK();
+}
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_backend_service.h 
b/be/src/cloud/cloud_backend_service.h
new file mode 100644
index 00000000000..6c9a710c3a9
--- /dev/null
+++ b/be/src/cloud/cloud_backend_service.h
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "service/backend_service.h"
+
+namespace doris {
+
+class CloudStorageEngine;
+
+class CloudBackendService final : public BaseBackendService {
+public:
+    static Status create_service(CloudStorageEngine& engine, ExecEnv* 
exec_env, int port,
+                                 std::unique_ptr<ThriftServer>* server);
+
+    CloudBackendService(CloudStorageEngine& engine, ExecEnv* exec_env);
+
+    ~CloudBackendService() override;
+
+    // TODO(plat1ko): cloud backend functions
+
+private:
+    [[maybe_unused]] CloudStorageEngine& _engine;
+};
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_internal_service.cpp 
b/be/src/cloud/cloud_internal_service.cpp
new file mode 100644
index 00000000000..0ebbe1c20c6
--- /dev/null
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_internal_service.h"
+
+namespace doris {
+
+CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, 
ExecEnv* exec_env)
+        : PInternalService(exec_env), _engine(engine) {}
+
+CloudInternalServiceImpl::~CloudInternalServiceImpl() = default;
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_internal_service.h 
b/be/src/cloud/cloud_internal_service.h
new file mode 100644
index 00000000000..6399a8923fa
--- /dev/null
+++ b/be/src/cloud/cloud_internal_service.h
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "service/internal_service.h"
+
+namespace doris {
+
+class CloudStorageEngine;
+
+class CloudInternalServiceImpl final : public PInternalService {
+public:
+    CloudInternalServiceImpl(CloudStorageEngine& engine, ExecEnv* exec_env);
+
+    ~CloudInternalServiceImpl() override;
+
+    // TODO(plat1ko): cloud internal service functions
+
+private:
+    [[maybe_unused]] CloudStorageEngine& _engine;
+};
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_tablet_mgr.cpp 
b/be/src/cloud/cloud_tablet_mgr.cpp
index c6a794d02a6..daffc458583 100644
--- a/be/src/cloud/cloud_tablet_mgr.cpp
+++ b/be/src/cloud/cloud_tablet_mgr.cpp
@@ -185,7 +185,8 @@ Result<std::shared_ptr<CloudTablet>> 
CloudTabletMgr::get_tablet(int64_t tablet_i
                 delete value1;
             };
 
-            auto* handle = cache->insert(key, value.release(), 1, deleter);
+            auto* handle = cache->insert(key, value.release(), 1, deleter, 
CachePriority::NORMAL,
+                                         sizeof(CloudTablet));
             auto ret = std::shared_ptr<CloudTablet>(
                     tablet.get(), [cache, handle](...) { 
cache->release(handle); });
             _tablet_map->put(std::move(tablet));
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 2d131afa870..20117a89496 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -29,6 +29,7 @@
 #include <set>
 #include <utility>
 
+#include "cloud/config.h"
 #include "common/config.h"
 #include "io/fs/file_writer.h"
 #include "io/fs/local_file_system.h"
@@ -644,12 +645,16 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) 
{
         break;
     }
 
-    for (auto& rs : _rs_metas) {
-        rs->to_rowset_pb(tablet_meta_pb->add_rs_metas());
-    }
-    for (auto rs : _stale_rs_metas) {
-        rs->to_rowset_pb(tablet_meta_pb->add_stale_rs_metas());
+    // RowsetMetaPB is separated from TabletMetaPB
+    if (!config::is_cloud_mode()) {
+        for (auto& rs : _rs_metas) {
+            rs->to_rowset_pb(tablet_meta_pb->add_rs_metas());
+        }
+        for (auto rs : _stale_rs_metas) {
+            rs->to_rowset_pb(tablet_meta_pb->add_stale_rs_metas());
+        }
     }
+
     _schema->to_schema_pb(tablet_meta_pb->mutable_schema());
 
     tablet_meta_pb->set_in_restore_mode(in_restore_mode());
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 688d17dfa1e..9942262f9c9 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -261,8 +261,10 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     options.broken_paths = broken_paths;
     options.backend_uid = doris::UniqueId::gen_uid();
     if (config::is_cloud_mode()) {
+        std::cout << "start BE in cloud mode" << std::endl;
         _storage_engine = 
std::make_unique<CloudStorageEngine>(options.backend_uid);
     } else {
+        std::cout << "start BE in local mode" << std::endl;
         _storage_engine = std::make_unique<StorageEngine>(options);
     }
     auto st = _storage_engine->open();
diff --git a/be/src/runtime/load_stream_mgr.cpp 
b/be/src/runtime/load_stream_mgr.cpp
index 8d9d37c5d3a..7d9de12d078 100644
--- a/be/src/runtime/load_stream_mgr.cpp
+++ b/be/src/runtime/load_stream_mgr.cpp
@@ -52,7 +52,7 @@ Status LoadStreamMgr::open_load_stream(const 
POpenLoadStreamRequest* request,
     UniqueId load_id(request->load_id());
 
     {
-        std::lock_guard<decltype(_lock)> l(_lock);
+        std::lock_guard l(_lock);
         auto it = _load_streams_map.find(load_id);
         if (it != _load_streams_map.end()) {
             load_stream = it->second;
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index 6d8435d230a..50d28fd91ce 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -53,6 +53,7 @@
 #include "olap/rowset/pending_rowset_helper.h"
 #include "olap/rowset/rowset_factory.h"
 #include "olap/rowset/rowset_meta.h"
+#include "olap/snapshot_manager.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
 #include "olap/tablet_meta.h"
@@ -360,26 +361,20 @@ void _ingest_binlog(StorageEngine& engine, 
IngestBinlogArg* arg) {
 }
 } // namespace
 
-using apache::thrift::TException;
-using apache::thrift::TProcessor;
-using apache::thrift::TMultiplexedProcessor;
-using apache::thrift::transport::TTransportException;
-using apache::thrift::concurrency::ThreadFactory;
-
 BaseBackendService::BaseBackendService(ExecEnv* exec_env)
         : _exec_env(exec_env), _agent_server(new AgentServer(exec_env, 
*exec_env->master_info())) {}
 
+BaseBackendService::~BaseBackendService() = default;
+
 BackendService::BackendService(StorageEngine& engine, ExecEnv* exec_env)
         : BaseBackendService(exec_env), _engine(engine) {}
 
-Status BaseBackendService::create_service(ExecEnv* exec_env, int port,
-                                          std::unique_ptr<ThriftServer>* 
server) {
-    if (config::is_cloud_mode()) {
-        // TODO(plat1ko): cloud mode
-        return Status::NotSupported("Currently only support local storage 
engine");
-    }
-    auto service =
-            
std::make_shared<BackendService>(exec_env->storage_engine().to_local(), 
exec_env);
+BackendService::~BackendService() = default;
+
+Status BackendService::create_service(StorageEngine& engine, ExecEnv* 
exec_env, int port,
+                                      std::unique_ptr<ThriftServer>* server) {
+    auto service = std::make_shared<BackendService>(engine, exec_env);
+    service->_agent_server->start_workers(engine, exec_env);
     // TODO: do we want a BoostThreadFactory?
     // TODO: we want separate thread factories here, so that fe requests can't 
starve
     // be requests
@@ -695,12 +690,37 @@ void 
BackendService::check_storage_format(TCheckStorageFormatResult& result) {
 
 void BackendService::make_snapshot(TAgentResult& return_value,
                                    const TSnapshotRequest& snapshot_request) {
-    _agent_server->make_snapshot(_engine, return_value, snapshot_request);
+    std::string snapshot_path;
+    bool allow_incremental_clone = false;
+    Status status = _engine.snapshot_mgr()->make_snapshot(snapshot_request, 
&snapshot_path,
+                                                          
&allow_incremental_clone);
+    if (!status) {
+        LOG_WARNING("failed to make snapshot")
+                .tag("tablet_id", snapshot_request.tablet_id)
+                .tag("schema_hash", snapshot_request.schema_hash)
+                .error(status);
+    } else {
+        LOG_INFO("successfully make snapshot")
+                .tag("tablet_id", snapshot_request.tablet_id)
+                .tag("schema_hash", snapshot_request.schema_hash)
+                .tag("snapshot_path", snapshot_path);
+        return_value.__set_snapshot_path(snapshot_path);
+        return_value.__set_allow_incremental_clone(allow_incremental_clone);
+    }
+
+    status.to_thrift(&return_value.status);
+    
return_value.__set_snapshot_version(snapshot_request.preferred_snapshot_version);
 }
 
 void BackendService::release_snapshot(TAgentResult& return_value,
                                       const std::string& snapshot_path) {
-    _agent_server->release_snapshot(_engine, return_value, snapshot_path);
+    Status status = _engine.snapshot_mgr()->release_snapshot(snapshot_path);
+    if (!status) {
+        LOG_WARNING("failed to release snapshot").tag("snapshot_path", 
snapshot_path).error(status);
+    } else {
+        LOG_INFO("successfully release snapshot").tag("snapshot_path", 
snapshot_path);
+    }
+    status.to_thrift(&return_value.status);
 }
 
 void BackendService::ingest_binlog(TIngestBinlogResult& result,
@@ -903,29 +923,84 @@ void 
BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,
     }
 }
 
+void BaseBackendService::get_tablet_stat(TTabletStatResult& result) {
+    LOG(ERROR) << "get_tablet_stat is not implemented";
+}
+
+int64_t BaseBackendService::get_trash_used_capacity() {
+    LOG(ERROR) << "get_trash_used_capacity is not implemented";
+    return 0;
+}
+
+void BaseBackendService::get_stream_load_record(TStreamLoadRecordResult& 
result,
+                                                int64_t 
last_stream_record_time) {
+    LOG(ERROR) << "get_stream_load_record is not implemented";
+}
+
+void 
BaseBackendService::get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& 
diskTrashInfos) {
+    LOG(ERROR) << "get_disk_trash_used_capacity is not implemented";
+}
+
+void BaseBackendService::clean_trash() {
+    LOG(ERROR) << "clean_trash is not implemented";
+}
+
+void BaseBackendService::make_snapshot(TAgentResult& return_value,
+                                       const TSnapshotRequest& 
snapshot_request) {
+    LOG(ERROR) << "make_snapshot is not implemented";
+    return_value.__set_status(Status::NotSupported("make_snapshot is not 
implemented").to_thrift());
+}
+
+void BaseBackendService::release_snapshot(TAgentResult& return_value,
+                                          const std::string& snapshot_path) {
+    LOG(ERROR) << "release_snapshot is not implemented";
+    return_value.__set_status(
+            Status::NotSupported("release_snapshot is not 
implemented").to_thrift());
+}
+
+void BaseBackendService::check_storage_format(TCheckStorageFormatResult& 
result) {
+    LOG(ERROR) << "check_storage_format is not implemented";
+}
+
+void BaseBackendService::ingest_binlog(TIngestBinlogResult& result,
+                                       const TIngestBinlogRequest& request) {
+    LOG(ERROR) << "ingest_binlog is not implemented";
+    result.__set_status(Status::NotSupported("ingest_binlog is not 
implemented").to_thrift());
+}
+
+void BaseBackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,
+                                             const TQueryIngestBinlogRequest& 
request) {
+    LOG(ERROR) << "query_ingest_binlog is not implemented";
+    result.__set_status(TIngestBinlogStatus::UNKNOWN);
+    result.__set_err_msg("query_ingest_binlog is not implemented");
+}
+
 void BaseBackendService::pre_cache_async(TPreCacheAsyncResponse& response,
                                          const TPreCacheAsyncRequest& request) 
{
-    LOG(FATAL) << "BackendService is not implemented";
+    LOG(ERROR) << "pre_cache_async is not implemented";
+    response.__set_status(Status::NotSupported("pre_cache_async is not 
implemented").to_thrift());
 }
 
 void BaseBackendService::check_pre_cache(TCheckPreCacheResponse& response,
                                          const TCheckPreCacheRequest& request) 
{
-    LOG(FATAL) << "BackendService is not implemented";
+    LOG(ERROR) << "check_pre_cache is not implemented";
+    response.__set_status(Status::NotSupported("check_pre_cache is not 
implemented").to_thrift());
 }
 
 void BaseBackendService::sync_load_for_tablets(TSyncLoadForTabletsResponse& 
response,
                                                const 
TSyncLoadForTabletsRequest& request) {
-    LOG(FATAL) << "BackendService is not implemented";
+    LOG(ERROR) << "sync_load_for_tablets is not implemented";
 }
 
 void 
BaseBackendService::get_top_n_hot_partitions(TGetTopNHotPartitionsResponse& 
response,
                                                   const 
TGetTopNHotPartitionsRequest& request) {
-    LOG(FATAL) << "BackendService is not implemented";
+    LOG(ERROR) << "get_top_n_hot_partitions is not implemented";
 }
 
 void BaseBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
                                          const TWarmUpTabletsRequest& request) 
{
-    LOG(FATAL) << "BackendService is not implemented";
+    LOG(ERROR) << "warm_up_tablets is not implemented";
+    response.__set_status(Status::NotSupported("warm_up_tablets is not 
implemented").to_thrift());
 }
 
 } // namespace doris
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index 01c1a2abf00..3aaee529735 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -18,7 +18,6 @@
 #pragma once
 
 #include <gen_cpp/BackendService.h>
-#include <stdint.h>
 
 #include <memory>
 #include <string>
@@ -68,11 +67,7 @@ class BaseBackendService : public BackendServiceIf {
 public:
     BaseBackendService(ExecEnv* exec_env);
 
-    ~BaseBackendService() override = default;
-
-    // NOTE: now we do not support multiple backend in one process
-    static Status create_service(ExecEnv* exec_env, int port,
-                                 std::unique_ptr<ThriftServer>* server);
+    ~BaseBackendService() override;
 
     // Agent service
     void submit_tasks(TAgentResult& return_value,
@@ -116,8 +111,32 @@ public:
     // used for external service, close some context and release resource 
related with this context
     void close_scanner(TScanCloseResult& result_, const TScanCloseParams& 
params) override;
 
-    // TODO(AlexYue): The below cloud backend functions should be implemented 
in
-    // CloudBackendService
+    
////////////////////////////////////////////////////////////////////////////
+    // begin local backend functions
+    
////////////////////////////////////////////////////////////////////////////
+    void get_tablet_stat(TTabletStatResult& result) override;
+
+    int64_t get_trash_used_capacity() override;
+
+    void get_stream_load_record(TStreamLoadRecordResult& result,
+                                int64_t last_stream_record_time) override;
+
+    void get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& 
diskTrashInfos) override;
+
+    void clean_trash() override;
+
+    void make_snapshot(TAgentResult& return_value,
+                       const TSnapshotRequest& snapshot_request) override;
+
+    void release_snapshot(TAgentResult& return_value, const std::string& 
snapshot_path) override;
+
+    void check_storage_format(TCheckStorageFormatResult& result) override;
+
+    void ingest_binlog(TIngestBinlogResult& result, const 
TIngestBinlogRequest& request) override;
+
+    void query_ingest_binlog(TQueryIngestBinlogResult& result,
+                             const TQueryIngestBinlogRequest& request) 
override;
+
     
////////////////////////////////////////////////////////////////////////////
     // begin cloud backend functions
     
////////////////////////////////////////////////////////////////////////////
@@ -137,9 +156,6 @@ public:
     void warm_up_tablets(TWarmUpTabletsResponse& response,
                          const TWarmUpTabletsRequest& request) override;
 
-    
////////////////////////////////////////////////////////////////////////////
-    // end cloud backend functions
-    
////////////////////////////////////////////////////////////////////////////
 protected:
     Status start_plan_fragment_execution(const TExecPlanFragmentParams& 
exec_params);
 
@@ -151,9 +167,13 @@ protected:
 // `StorageEngine` mixin for `BaseBackendService`
 class BackendService final : public BaseBackendService {
 public:
+    // NOTE: now we do not support multiple backend in one process
+    static Status create_service(StorageEngine& engine, ExecEnv* exec_env, int 
port,
+                                 std::unique_ptr<ThriftServer>* server);
+
     BackendService(StorageEngine& engine, ExecEnv* exec_env);
 
-    ~BackendService() override = default;
+    ~BackendService() override;
 
     void get_tablet_stat(TTabletStatResult& result) override;
 
diff --git a/be/src/service/brpc_service.cpp b/be/src/service/brpc_service.cpp
index 8bd147800a5..534c7b7b0c6 100644
--- a/be/src/service/brpc_service.cpp
+++ b/be/src/service/brpc_service.cpp
@@ -27,6 +27,7 @@
 
 #include <ostream>
 
+#include "cloud/cloud_internal_service.h"
 #include "cloud/config.h"
 #include "common/config.h"
 #include "common/logging.h"
@@ -59,13 +60,16 @@ BRpcService::~BRpcService() {
 }
 
 Status BRpcService::start(int port, int num_threads) {
+    // Add service
     if (config::is_cloud_mode()) {
-        // TODO(plat1ko): cloud mode
-        return Status::NotSupported("Currently only support local storage 
engine");
+        _server->AddService(
+                new 
CloudInternalServiceImpl(_exec_env->storage_engine().to_cloud(), _exec_env),
+                brpc::SERVER_OWNS_SERVICE);
+    } else {
+        _server->AddService(
+                new 
PInternalServiceImpl(_exec_env->storage_engine().to_local(), _exec_env),
+                brpc::SERVER_OWNS_SERVICE);
     }
-    // Add service
-    _server->AddService(new 
PInternalServiceImpl(_exec_env->storage_engine().to_local(), _exec_env),
-                        brpc::SERVER_OWNS_SERVICE);
     // start service
     brpc::ServerOptions options;
     if (num_threads != -1) {
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index f013b83e68d..1ecf1a7a0d4 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -39,6 +39,8 @@
 #include <tuple>
 #include <vector>
 
+#include "cloud/cloud_backend_service.h"
+#include "cloud/config.h"
 #include "common/stack_trace.h"
 #include "olap/tablet_schema_cache.h"
 #include "olap/utils.h"
@@ -483,7 +485,7 @@ int main(int argc, char** argv) {
     doris::ThreadLocalHandle::create_thread_local_if_not_exits();
 
     // init exec env
-    auto exec_env(doris::ExecEnv::GetInstance());
+    auto* exec_env(doris::ExecEnv::GetInstance());
     status = doris::ExecEnv::init(doris::ExecEnv::GetInstance(), paths, 
broken_paths);
     if (status != Status::OK()) {
         LOG(ERROR) << "failed to init doris storage engine, res=" << status;
@@ -494,8 +496,17 @@ int main(int argc, char** argv) {
     doris::ThriftRpcHelper::setup(exec_env);
     // 1. thrift server with be_port
     std::unique_ptr<doris::ThriftServer> be_server;
-    EXIT_IF_ERROR(
-            doris::BackendService::create_service(exec_env, 
doris::config::be_port, &be_server));
+
+    if (doris::config::is_cloud_mode()) {
+        EXIT_IF_ERROR(doris::CloudBackendService::create_service(
+                exec_env->storage_engine().to_cloud(), exec_env, 
doris::config::be_port,
+                &be_server));
+    } else {
+        
EXIT_IF_ERROR(doris::BackendService::create_service(exec_env->storage_engine().to_local(),
+                                                            exec_env, 
doris::config::be_port,
+                                                            &be_server));
+    }
+
     status = be_server->start();
     if (!status.ok()) {
         LOG(ERROR) << "Doris Be server did not start correctly, exiting";
diff --git a/be/test/agent/task_worker_pool_test.cpp 
b/be/test/agent/task_worker_pool_test.cpp
index 5fbe652dda9..0c7fd88396b 100644
--- a/be/test/agent/task_worker_pool_test.cpp
+++ b/be/test/agent/task_worker_pool_test.cpp
@@ -41,14 +41,14 @@ TEST(TaskWorkerPoolTest, TaskWorkerPool) {
 
     TAgentTaskRequest task;
     task.__set_signature(-1);
-    workers.submit_task(task);
-    workers.submit_task(task);
-    workers.submit_task(task); // Pending and ignored when stop
+    auto _ = workers.submit_task(task);
+    _ = workers.submit_task(task);
+    _ = workers.submit_task(task); // Pending and ignored when stop
 
     std::this_thread::sleep_for(200ms);
     workers.stop();
 
-    workers.submit_task(task); // Ignore
+    _ = workers.submit_task(task); // Ignore
 
     EXPECT_EQ(count.load(), 2);
 }
@@ -69,13 +69,13 @@ TEST(TaskWorkerPoolTest, PriorTaskWorkerPool) {
     TAgentTaskRequest task;
     task.__set_signature(-1);
     task.__set_priority(TPriority::NORMAL);
-    workers.submit_task(task);
-    workers.submit_task(task);
+    auto _ = workers.submit_task(task);
+    _ = workers.submit_task(task);
     std::this_thread::sleep_for(200ms);
 
     task.__set_priority(TPriority::HIGH);
     // Normal pool is busy, but high prior pool should be idle
-    workers.submit_task(task);
+    _ = workers.submit_task(task);
     std::this_thread::sleep_for(500ms);
     EXPECT_EQ(normal_count.load(), 0);
     EXPECT_EQ(high_prior_count.load(), 1);
@@ -84,8 +84,8 @@ TEST(TaskWorkerPoolTest, PriorTaskWorkerPool) {
     EXPECT_EQ(normal_count.load(), 2);
     EXPECT_EQ(high_prior_count.load(), 1);
     // Both normal and high prior pool are idle
-    workers.submit_task(task);
-    workers.submit_task(task);
+    _ = workers.submit_task(task);
+    _ = workers.submit_task(task);
 
     std::this_thread::sleep_for(500ms);
     EXPECT_EQ(normal_count.load(), 2);
@@ -96,7 +96,7 @@ TEST(TaskWorkerPoolTest, PriorTaskWorkerPool) {
     EXPECT_EQ(normal_count.load(), 2);
     EXPECT_EQ(high_prior_count.load(), 3);
 
-    workers.submit_task(task); // Ignore
+    _ = workers.submit_task(task); // Ignore
 
     EXPECT_EQ(normal_count.load(), 2);
     EXPECT_EQ(high_prior_count.load(), 3);
@@ -110,7 +110,7 @@ TEST(TaskWorkerPoolTest, ReportWorkerPool) {
     std::atomic_int count {0};
     ReportWorker worker("test", master_info, 1, [&] { ++count; });
 
-    worker.notify(); // Not received heartbeat yet, igonre
+    worker.notify(); // Not received heartbeat yet, ignore
     std::this_thread::sleep_for(100ms);
 
     master_info.network_address.__set_port(9030);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to