This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 478b36edb75 branch-3.0: [opt](http) enable auth token with BE http 
request (#43659)
478b36edb75 is described below

commit 478b36edb75e669da433c2bcd9d3516ae9f3d41a
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Nov 12 16:10:16 2024 +0800

    branch-3.0: [opt](http) enable auth token with BE http request (#43659)
    
    Cherry-picked from #41994
    
    Co-authored-by: Mingyu Chen (Rayner) <morning...@163.com>
    Co-authored-by: morningman <yun...@selectdb.com>
---
 be/src/agent/agent_server.cpp                      | 34 ++++++------
 be/src/agent/agent_server.h                        |  8 +--
 be/src/agent/heartbeat_server.cpp                  | 63 +++++++++++++---------
 be/src/agent/heartbeat_server.h                    | 13 ++---
 be/src/agent/task_worker_pool.cpp                  | 40 +++++++-------
 be/src/agent/task_worker_pool.h                    | 16 +++---
 be/src/agent/utils.cpp                             | 49 ++++++++---------
 be/src/agent/utils.h                               | 10 ++--
 be/src/cloud/cloud_meta_mgr.cpp                    |  2 +-
 be/src/common/utils.h                              |  3 +-
 .../schema_active_queries_scanner.cpp              |  2 +-
 .../schema_scanner/schema_partitions_scanner.cpp   |  2 +-
 .../exec/schema_scanner/schema_routine_scanner.cpp |  4 +-
 .../schema_table_options_scanner.cpp               |  2 +-
 .../schema_table_properties_scanner.cpp            |  2 +-
 .../schema_workload_group_privileges.cpp           |  4 +-
 .../schema_workload_groups_scanner.cpp             |  4 +-
 .../schema_workload_sched_policy_scanner.cpp       |  4 +-
 be/src/http/action/compaction_score_action.cpp     |  2 +-
 be/src/http/action/http_stream.cpp                 |  8 +--
 be/src/http/action/stream_load.cpp                 |  2 +-
 be/src/http/http_client.cpp                        |  4 ++
 be/src/http/http_client.h                          |  8 +++
 be/src/http/http_common.h                          |  2 +-
 be/src/http/http_handler_with_auth.cpp             | 19 ++++++-
 be/src/http/http_headers.cpp                       |  3 +-
 be/src/http/http_headers.h                         |  3 +-
 be/src/http/utils.cpp                              | 11 +++-
 be/src/io/fs/multi_table_pipe.cpp                  |  2 +-
 be/src/olap/olap_server.cpp                        |  6 +--
 be/src/olap/task/engine_clone_task.cpp             |  6 +--
 be/src/olap/task/engine_clone_task.h               |  6 +--
 be/src/olap/wal/wal_manager.cpp                    |  6 +--
 be/src/olap/wal/wal_table.cpp                      | 14 +++--
 be/src/runtime/cluster_info.h                      | 48 +++++++++++++++++
 be/src/runtime/exec_env.cpp                        | 15 ++++--
 be/src/runtime/exec_env.h                          | 12 +++--
 be/src/runtime/exec_env_init.cpp                   |  6 +--
 be/src/runtime/fragment_mgr.cpp                    |  4 +-
 be/src/runtime/group_commit_mgr.cpp                | 26 +++++----
 .../routine_load/routine_load_task_executor.cpp    |  2 +
 be/src/runtime/runtime_query_statistics_mgr.cpp    |  6 +--
 be/src/runtime/small_file_mgr.cpp                  |  6 +--
 be/src/runtime/snapshot_loader.cpp                 |  2 +-
 .../runtime/stream_load/stream_load_executor.cpp   | 12 ++---
 .../workload_group/workload_group_manager.cpp      |  2 +-
 be/src/service/backend_service.cpp                 |  2 +-
 be/src/service/doris_main.cpp                      |  8 ++-
 be/src/service/internal_service.cpp                |  2 +-
 be/src/vec/exec/scan/vmeta_scanner.cpp             |  2 +-
 be/src/vec/sink/autoinc_buffer.cpp                 |  4 +-
 be/src/vec/sink/vrow_distribution.cpp              |  4 +-
 be/test/agent/heartbeat_server_test.cpp            | 12 ++---
 be/test/agent/mock_utils.h                         |  2 +-
 be/test/agent/task_worker_pool_test.cpp            |  7 +--
 be/test/http/http_client_test.cpp                  | 34 +++++++++++-
 be/test/olap/wal/wal_manager_test.cpp              | 12 ++---
 .../runtime/routine_load_task_executor_test.cpp    |  6 +--
 be/test/vec/exec/vfile_scanner_exception_test.cpp  | 13 ++---
 be/test/vec/exec/vwal_scanner_test.cpp             | 13 ++---
 .../main/java/org/apache/doris/catalog/Env.java    |  8 +++
 .../{load/loadv2 => catalog}/TokenManager.java     | 10 +++-
 .../org/apache/doris/httpv2/rest/LoadAction.java   |  2 +-
 .../org/apache/doris/load/loadv2/LoadManager.java  |  9 +---
 .../apache/doris/load/loadv2/MysqlLoadManager.java |  6 +--
 .../doris/load/sync/canal/CanalSyncChannel.java    |  2 +-
 .../trees/plans/commands/insert/InsertUtils.java   |  2 +-
 .../org/apache/doris/plugin/audit/AuditLoader.java |  2 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |  4 +-
 .../apache/doris/service/FrontendServiceImpl.java  | 18 +++----
 .../java/org/apache/doris/system/HeartbeatMgr.java |  1 +
 .../apache/doris/transaction/TransactionEntry.java |  2 +-
 .../apache/doris/load/loadv2/TokenManagerTest.java |  1 +
 gensrc/thrift/FrontendService.thrift               | 24 ++++-----
 gensrc/thrift/HeartbeatService.thrift              |  1 +
 75 files changed, 442 insertions(+), 276 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 361a8ab93a9..0b17f3782e7 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -49,9 +49,9 @@ using std::vector;
 
 namespace doris {
 
-AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info)
-        : _master_info(master_info), _topic_subscriber(new TopicSubscriber()) {
-    MasterServerClient::create(master_info);
+AgentServer::AgentServer(ExecEnv* exec_env, const ClusterInfo* cluster_info)
+        : _cluster_info(cluster_info), _topic_subscriber(new 
TopicSubscriber()) {
+    MasterServerClient::create(cluster_info);
 
 #if !defined(BE_TEST) && !defined(__APPLE__)
     // Add subscriber here and register listeners
@@ -170,7 +170,7 @@ void AgentServer::start_workers(StorageEngine& engine, 
ExecEnv* exec_env) {
             "ALTER_TABLE", config::alter_tablet_worker_count, [&engine](auto&& 
task) { return alter_tablet_callback(engine, task); });
 
     _workers[TTaskType::CLONE] = std::make_unique<TaskWorkerPool>(
-            "CLONE", config::clone_worker_count, [&engine, &master_info = 
_master_info](auto&& task) { return clone_callback(engine, master_info, task); 
});
+            "CLONE", config::clone_worker_count, [&engine, &cluster_info = 
_cluster_info](auto&& task) { return clone_callback(engine, cluster_info, 
task); });
 
     _workers[TTaskType::STORAGE_MEDIUM_MIGRATE] = 
std::make_unique<TaskWorkerPool>(
             "STORAGE_MEDIUM_MIGRATE", config::storage_medium_migrate_count, 
[&engine](auto&& task) { return storage_medium_migrate_callback(engine, task); 
});
@@ -188,13 +188,13 @@ void AgentServer::start_workers(StorageEngine& engine, 
ExecEnv* exec_env) {
             "UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return 
visible_version_callback(engine, task); });
 
     _report_workers.push_back(std::make_unique<ReportWorker>(
-            "REPORT_TASK", _master_info, config::report_task_interval_seconds, 
[&master_info = _master_info] { report_task_callback(master_info); }));
+            "REPORT_TASK", _cluster_info, 
config::report_task_interval_seconds, [&cluster_info = _cluster_info] { 
report_task_callback(cluster_info); }));
 
     _report_workers.push_back(std::make_unique<ReportWorker>(
-            "REPORT_DISK_STATE", _master_info, 
config::report_disk_state_interval_seconds, [&engine, &master_info = 
_master_info] { report_disk_callback(engine, master_info); }));
+            "REPORT_DISK_STATE", _cluster_info, 
config::report_disk_state_interval_seconds, [&engine, &cluster_info = 
_cluster_info] { report_disk_callback(engine, cluster_info); }));
 
     _report_workers.push_back(std::make_unique<ReportWorker>(
-            "REPORT_OLAP_TABLET", _master_info, 
config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] { 
report_tablet_callback(engine, master_info); }));
+            "REPORT_OLAP_TABLET", _cluster_info, 
config::report_tablet_interval_seconds,[&engine, &cluster_info = _cluster_info] 
{ report_tablet_callback(engine, cluster_info); }));
     // clang-format on
 }
 
@@ -217,18 +217,20 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& 
engine, ExecEnv* exec_
             "DROP_TABLE", 1, [&engine](auto&& task) { return 
drop_tablet_callback(engine, task); });
 
     _report_workers.push_back(std::make_unique<ReportWorker>(
-            "REPORT_TASK", _master_info, config::report_task_interval_seconds,
-            [&master_info = _master_info] { report_task_callback(master_info); 
}));
+            "REPORT_TASK", _cluster_info, config::report_task_interval_seconds,
+            [&cluster_info = _cluster_info] { 
report_task_callback(cluster_info); }));
 
     _report_workers.push_back(std::make_unique<ReportWorker>(
-            "REPORT_DISK_STATE", _master_info, 
config::report_disk_state_interval_seconds,
-            [&engine, &master_info = _master_info] { 
report_disk_callback(engine, master_info); }));
+            "REPORT_DISK_STATE", _cluster_info, 
config::report_disk_state_interval_seconds,
+            [&engine, &cluster_info = _cluster_info] {
+                report_disk_callback(engine, cluster_info);
+            }));
 
     if (config::enable_cloud_tablet_report) {
         _report_workers.push_back(std::make_unique<ReportWorker>(
-                "REPORT_OLAP_TABLET", _master_info, 
config::report_tablet_interval_seconds,
-                [&engine, &master_info = _master_info] {
-                    report_tablet_callback(engine, master_info);
+                "REPORT_OLAP_TABLET", _cluster_info, 
config::report_tablet_interval_seconds,
+                [&engine, &cluster_info = _cluster_info] {
+                    report_tablet_callback(engine, cluster_info);
                 }));
     }
 }
@@ -239,8 +241,8 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
                                const std::vector<TAgentTaskRequest>& tasks) {
     Status ret_st;
 
-    // TODO check master_info here if it is the same with that of heartbeat rpc
-    if (_master_info.network_address.hostname.empty() || 
_master_info.network_address.port == 0) {
+    // TODO check cluster_info here if it is the same with that of heartbeat 
rpc
+    if (_cluster_info->master_fe_addr.hostname.empty() || 
_cluster_info->master_fe_addr.port == 0) {
         Status ret_st = Status::Cancelled("Have not get FE Master heartbeat 
yet");
         ret_st.to_thrift(&agent_result.status);
         return;
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index 5a7fbafb72e..e5b5b522ba0 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -35,7 +35,7 @@ class ExecEnv;
 class TAgentPublishRequest;
 class TAgentResult;
 class TAgentTaskRequest;
-class TMasterInfo;
+class ClusterInfo;
 class TSnapshotRequest;
 class StorageEngine;
 class CloudStorageEngine;
@@ -43,7 +43,7 @@ class CloudStorageEngine;
 // Each method corresponds to one RPC from FE Master, see BackendService.
 class AgentServer {
 public:
-    explicit AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info);
+    explicit AgentServer(ExecEnv* exec_env, const ClusterInfo* cluster_info);
     ~AgentServer();
 
     void start_workers(StorageEngine& engine, ExecEnv* exec_env);
@@ -63,8 +63,8 @@ public:
     void stop_report_workers();
 
 private:
-    // Reference to the ExecEnv::_master_info
-    const TMasterInfo& _master_info;
+    // Reference to the ExecEnv::_cluster_info
+    const ClusterInfo* _cluster_info;
 
     std::unordered_map<int64_t /* TTaskType */, 
std::unique_ptr<TaskWorkerPoolIf>> _workers;
 
diff --git a/be/src/agent/heartbeat_server.cpp 
b/be/src/agent/heartbeat_server.cpp
index 78002ed08fe..11345ea06f0 100644
--- a/be/src/agent/heartbeat_server.cpp
+++ b/be/src/agent/heartbeat_server.cpp
@@ -31,6 +31,7 @@
 #include "common/config.h"
 #include "common/status.h"
 #include "olap/storage_engine.h"
+#include "runtime/cluster_info.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
 #include "runtime/heartbeat_flags.h"
@@ -49,15 +50,15 @@ class TProcessor;
 
 namespace doris {
 
-HeartbeatServer::HeartbeatServer(TMasterInfo* master_info)
+HeartbeatServer::HeartbeatServer(ClusterInfo* cluster_info)
         : _engine(ExecEnv::GetInstance()->storage_engine()),
-          _master_info(master_info),
+          _cluster_info(cluster_info),
           _fe_epoch(0) {
     _be_epoch = GetCurrentTimeMicros() / 1000;
 }
 
 void HeartbeatServer::init_cluster_id() {
-    _master_info->cluster_id = _engine.effective_cluster_id();
+    _cluster_info->cluster_id = _engine.effective_cluster_id();
 }
 
 void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
@@ -65,7 +66,7 @@ void HeartbeatServer::heartbeat(THeartbeatResult& 
heartbeat_result,
     //print heartbeat in every minute
     LOG_EVERY_N(INFO, 12) << "get heartbeat from FE."
                           << "host:" << master_info.network_address.hostname
-                          << ", port:" << master_info.network_address.port
+                          << ", rpc port:" << master_info.network_address.port
                           << ", cluster id:" << master_info.cluster_id
                           << ", frontend_info:" << 
PrintFrontendInfos(master_info.frontend_infos)
                           << ", counter:" << google::COUNTER << ", BE start 
time: " << _be_epoch;
@@ -108,22 +109,23 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& 
master_info) {
     std::lock_guard<std::mutex> lk(_hb_mtx);
 
     // Check cluster id
-    if (_master_info->cluster_id == -1) {
+    if (_cluster_info->cluster_id == -1) {
         LOG(INFO) << "get first heartbeat. update cluster id";
         // write and update cluster id
         RETURN_IF_ERROR(_engine.set_cluster_id(master_info.cluster_id));
 
-        _master_info->cluster_id = master_info.cluster_id;
+        _cluster_info->cluster_id = master_info.cluster_id;
         LOG(INFO) << "record cluster id. host: " << 
master_info.network_address.hostname
                   << ". port: " << master_info.network_address.port
                   << ". cluster id: " << master_info.cluster_id
                   << ". frontend_infos: " << 
PrintFrontendInfos(master_info.frontend_infos);
     } else {
-        if (_master_info->cluster_id != master_info.cluster_id) {
+        if (_cluster_info->cluster_id != master_info.cluster_id) {
             return Status::InternalError(
                     "invalid cluster id. ignore. Record cluster id ={}, record 
frontend info {}. "
                     "Invalid cluster_id={}, invalid frontend info {}",
-                    _master_info->cluster_id, 
PrintFrontendInfos(_master_info->frontend_infos),
+                    _cluster_info->cluster_id,
+                    
PrintFrontendInfos(ExecEnv::GetInstance()->get_frontends()),
                     master_info.cluster_id, 
PrintFrontendInfos(master_info.frontend_infos));
         }
     }
@@ -183,22 +185,22 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& 
master_info) {
     }
 
     bool need_report = false;
-    if (_master_info->network_address.hostname != 
master_info.network_address.hostname ||
-        _master_info->network_address.port != 
master_info.network_address.port) {
+    if (_cluster_info->master_fe_addr.hostname != 
master_info.network_address.hostname ||
+        _cluster_info->master_fe_addr.port != 
master_info.network_address.port) {
         if (master_info.epoch > _fe_epoch) {
-            _master_info->network_address.hostname = 
master_info.network_address.hostname;
-            _master_info->network_address.port = 
master_info.network_address.port;
+            _cluster_info->master_fe_addr.hostname = 
master_info.network_address.hostname;
+            _cluster_info->master_fe_addr.port = 
master_info.network_address.port;
             _fe_epoch = master_info.epoch;
             need_report = true;
             LOG(INFO) << "master change. new master host: "
-                      << _master_info->network_address.hostname
-                      << ". port: " << _master_info->network_address.port
+                      << _cluster_info->master_fe_addr.hostname
+                      << ". port: " << _cluster_info->master_fe_addr.port
                       << ". epoch: " << _fe_epoch;
         } else {
             return Status::InternalError(
                     "epoch is not greater than local. ignore heartbeat. host: 
{}, port: {}, local "
                     "epoch: {}, received epoch: {}",
-                    _master_info->network_address.hostname, 
_master_info->network_address.port,
+                    _cluster_info->master_fe_addr.hostname, 
_cluster_info->master_fe_addr.port,
                     _fe_epoch, master_info.epoch);
         }
     } else {
@@ -211,16 +213,17 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& 
master_info) {
     }
 
     if (master_info.__isset.token) {
-        if (!_master_info->__isset.token) {
-            _master_info->__set_token(master_info.token);
-            LOG(INFO) << "get token. token: " << _master_info->token;
-        } else if (_master_info->token != master_info.token) {
-            return Status::InternalError("invalid token");
+        if (_cluster_info->token == "") {
+            _cluster_info->token = master_info.token;
+            LOG(INFO) << "get token. token: " << _cluster_info->token;
+        } else if (_cluster_info->token != master_info.token) {
+            return Status::InternalError("invalid token. local: {}, master: 
{}",
+                                         _cluster_info->token, 
master_info.token);
         }
     }
 
     if (master_info.__isset.http_port) {
-        _master_info->__set_http_port(master_info.http_port);
+        _cluster_info->master_fe_http_port = master_info.http_port;
     }
 
     if (master_info.__isset.heartbeat_flags) {
@@ -229,7 +232,7 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& 
master_info) {
     }
 
     if (master_info.__isset.backend_id) {
-        _master_info->__set_backend_id(master_info.backend_id);
+        _cluster_info->backend_id = master_info.backend_id;
         BackendOptions::set_backend_id(master_info.backend_id);
     }
     if (master_info.__isset.frontend_infos) {
@@ -281,6 +284,18 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& 
master_info) {
                 master_info.tablet_report_inactive_duration_ms;
     }
 
+    if (master_info.__isset.auth_token) {
+        if (_cluster_info->curr_auth_token == "") {
+            _cluster_info->curr_auth_token = master_info.auth_token;
+            LOG(INFO) << "set new auth token: " << master_info.auth_token;
+        } else if (_cluster_info->curr_auth_token != master_info.auth_token) {
+            LOG(INFO) << "last auth token: " << _cluster_info->last_auth_token
+                      << "set new auth token: " << master_info.auth_token;
+            _cluster_info->last_auth_token = _cluster_info->curr_auth_token;
+            _cluster_info->curr_auth_token = master_info.auth_token;
+        }
+    }
+
     if (need_report) {
         LOG(INFO) << "Master FE is changed or restarted. report tablet and 
disk info immediately";
         _engine.notify_listeners();
@@ -291,8 +306,8 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& 
master_info) {
 
 Status create_heartbeat_server(ExecEnv* exec_env, uint32_t server_port,
                                std::unique_ptr<ThriftServer>* thrift_server,
-                               uint32_t worker_thread_num, TMasterInfo* 
local_master_info) {
-    HeartbeatServer* heartbeat_server = new HeartbeatServer(local_master_info);
+                               uint32_t worker_thread_num, ClusterInfo* 
cluster_info) {
+    HeartbeatServer* heartbeat_server = new HeartbeatServer(cluster_info);
     if (heartbeat_server == nullptr) {
         return Status::InternalError("Get heartbeat server failed");
     }
diff --git a/be/src/agent/heartbeat_server.h b/be/src/agent/heartbeat_server.h
index 29fecd07881..67d67fd486d 100644
--- a/be/src/agent/heartbeat_server.h
+++ b/be/src/agent/heartbeat_server.h
@@ -26,6 +26,7 @@
 #include "common/status.h"
 
 namespace doris {
+class ClusterInfo;
 class ExecEnv;
 class THeartbeatResult;
 class TMasterInfo;
@@ -36,7 +37,7 @@ class ThriftServer;
 
 class HeartbeatServer : public HeartbeatServiceIf {
 public:
-    explicit HeartbeatServer(TMasterInfo* master_info);
+    explicit HeartbeatServer(ClusterInfo* cluster_info);
     ~HeartbeatServer() override = default;
 
     void init_cluster_id();
@@ -44,7 +45,7 @@ public:
     // Master send heartbeat to this server
     //
     // Input parameters:
-    // * master_info: The struct of master info, contains host ip and port
+    // * master_info: The struct of master info, contains cluster info from 
Master FE
     //
     // Output parameters:
     // * heartbeat_result: The result of heartbeat set
@@ -56,10 +57,10 @@ private:
     BaseStorageEngine& _engine;
     int64_t _be_epoch;
 
-    // mutex to protect master_info and _epoch
+    // mutex to protect cluster_info and _epoch
     std::mutex _hb_mtx;
-    // Not owned. Point to the ExecEnv::_master_info
-    TMasterInfo* _master_info = nullptr;
+    // Not owned. Point to the ExecEnv::_cluster_info
+    ClusterInfo* _cluster_info = nullptr;
     int64_t _fe_epoch;
 
     DISALLOW_COPY_AND_ASSIGN(HeartbeatServer);
@@ -67,5 +68,5 @@ private:
 
 Status create_heartbeat_server(ExecEnv* exec_env, uint32_t 
heartbeat_server_port,
                                std::unique_ptr<ThriftServer>* 
heart_beat_server,
-                               uint32_t worker_thread_num, TMasterInfo* 
local_master_info);
+                               uint32_t worker_thread_num, ClusterInfo* 
cluster_info);
 } // namespace doris
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 4614304e439..01b107b3ea7 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -378,22 +378,22 @@ Status check_migrate_request(StorageEngine& engine, const 
TStorageMediumMigrateR
 }
 
 // Return `true` if report success
-bool handle_report(const TReportRequest& request, const TMasterInfo& 
master_info,
+bool handle_report(const TReportRequest& request, const ClusterInfo* 
cluster_info,
                    std::string_view name) {
     TMasterResult result;
     Status status = MasterServerClient::instance()->report(request, &result);
     if (!status.ok()) [[unlikely]] {
         LOG_WARNING("failed to report {}", name)
-                .tag("host", master_info.network_address.hostname)
-                .tag("port", master_info.network_address.port)
+                .tag("host", cluster_info->master_fe_addr.hostname)
+                .tag("port", cluster_info->master_fe_addr.port)
                 .error(status);
         return false;
     }
 
     else if (result.status.status_code != TStatusCode::OK) [[unlikely]] {
         LOG_WARNING("failed to report {}", name)
-                .tag("host", master_info.network_address.hostname)
-                .tag("port", master_info.network_address.port)
+                .tag("host", cluster_info->master_fe_addr.hostname)
+                .tag("port", cluster_info->master_fe_addr.port)
                 .error(result.status);
         return false;
     }
@@ -663,10 +663,10 @@ void PriorTaskWorkerPool::high_prior_loop() {
     }
 }
 
-ReportWorker::ReportWorker(std::string name, const TMasterInfo& master_info, 
int report_interval_s,
+ReportWorker::ReportWorker(std::string name, const ClusterInfo* cluster_info, 
int report_interval_s,
                            std::function<void()> callback)
         : _name(std::move(name)) {
-    auto report_loop = [this, &master_info, report_interval_s, callback = 
std::move(callback)] {
+    auto report_loop = [this, cluster_info, report_interval_s, callback = 
std::move(callback)] {
         auto& engine = ExecEnv::GetInstance()->storage_engine();
         engine.register_report_listener(this);
         while (true) {
@@ -685,7 +685,7 @@ ReportWorker::ReportWorker(std::string name, const 
TMasterInfo& master_info, int
                 }
             }
 
-            if (master_info.network_address.port == 0) {
+            if (cluster_info->master_fe_addr.port == 0) {
                 // port == 0 means not received heartbeat yet
                 LOG(INFO) << "waiting to receive first heartbeat from frontend 
before doing report";
                 continue;
@@ -990,7 +990,7 @@ void check_consistency_callback(StorageEngine& engine, 
const TAgentTaskRequest&
     remove_task_info(req.task_type, req.signature);
 }
 
-void report_task_callback(const TMasterInfo& master_info) {
+void report_task_callback(const ClusterInfo* cluster_info) {
     TReportRequest request;
     if (config::report_random_wait) {
         random_sleep(5);
@@ -1007,14 +1007,14 @@ void report_task_callback(const TMasterInfo& 
master_info) {
         }
     }
     request.__set_backend(BackendOptions::get_local_backend());
-    bool succ = handle_report(request, master_info, "task");
+    bool succ = handle_report(request, cluster_info, "task");
     report_task_total << 1;
     if (!succ) [[unlikely]] {
         report_task_failed << 1;
     }
 }
 
-void report_disk_callback(StorageEngine& engine, const TMasterInfo& 
master_info) {
+void report_disk_callback(StorageEngine& engine, const ClusterInfo* 
cluster_info) {
     TReportRequest request;
     request.__set_backend(BackendOptions::get_local_backend());
     request.__isset.disks = true;
@@ -1039,14 +1039,14 @@ void report_disk_callback(StorageEngine& engine, const 
TMasterInfo& master_info)
     request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
                                                  ? 
config::pipeline_executor_size
                                                  : CpuInfo::num_cores());
-    bool succ = handle_report(request, master_info, "disk");
+    bool succ = handle_report(request, cluster_info, "disk");
     report_disk_total << 1;
     if (!succ) [[unlikely]] {
         report_disk_failed << 1;
     }
 }
 
-void report_disk_callback(CloudStorageEngine& engine, const TMasterInfo& 
master_info) {
+void report_disk_callback(CloudStorageEngine& engine, const ClusterInfo* 
cluster_info) {
     // Random sleep 1~5 seconds before doing report.
     // In order to avoid the problem that the FE receives many report requests 
at the same time
     // and can not be processed.
@@ -1066,12 +1066,12 @@ void report_disk_callback(CloudStorageEngine& engine, 
const TMasterInfo& master_
     request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
                                                  ? 
config::pipeline_executor_size
                                                  : CpuInfo::num_cores());
-    bool succ = handle_report(request, master_info, "disk");
+    bool succ = handle_report(request, cluster_info, "disk");
     report_disk_total << 1;
     report_disk_failed << !succ;
 }
 
-void report_tablet_callback(StorageEngine& engine, const TMasterInfo& 
master_info) {
+void report_tablet_callback(StorageEngine& engine, const ClusterInfo* 
cluster_info) {
     if (config::report_random_wait) {
         random_sleep(5);
     }
@@ -1133,14 +1133,14 @@ void report_tablet_callback(StorageEngine& engine, 
const TMasterInfo& master_inf
     }
     request.__isset.resource = true;
 
-    bool succ = handle_report(request, master_info, "tablet");
+    bool succ = handle_report(request, cluster_info, "tablet");
     report_tablet_total << 1;
     if (!succ) [[unlikely]] {
         report_tablet_failed << 1;
     }
 }
 
-void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo& 
master_info) {
+void report_tablet_callback(CloudStorageEngine& engine, const ClusterInfo* 
cluster_info) {
     // Random sleep 1~5 seconds before doing report.
     // In order to avoid the problem that the FE receives many report requests 
at the same time
     // and can not be processed.
@@ -1173,7 +1173,7 @@ void report_tablet_callback(CloudStorageEngine& engine, 
const TMasterInfo& maste
     request.__set_report_version(report_version);
     request.__set_num_tablets(total_num_tablets);
 
-    bool succ = handle_report(request, master_info, "tablet");
+    bool succ = handle_report(request, cluster_info, "tablet");
     report_tablet_total << 1;
     if (!succ) [[unlikely]] {
         report_tablet_failed << 1;
@@ -2013,7 +2013,7 @@ void visible_version_callback(StorageEngine& engine, 
const TAgentTaskRequest& re
             visible_version_req.partition_version);
 }
 
-void clone_callback(StorageEngine& engine, const TMasterInfo& master_info,
+void clone_callback(StorageEngine& engine, const ClusterInfo* cluster_info,
                     const TAgentTaskRequest& req) {
     const auto& clone_req = req.clone_req;
 
@@ -2021,7 +2021,7 @@ void clone_callback(StorageEngine& engine, const 
TMasterInfo& master_info,
     LOG(INFO) << "get clone task. signature=" << req.signature;
 
     std::vector<TTabletInfo> tablet_infos;
-    EngineCloneTask engine_task(engine, clone_req, master_info, req.signature, 
&tablet_infos);
+    EngineCloneTask engine_task(engine, clone_req, cluster_info, 
req.signature, &tablet_infos);
     SCOPED_ATTACH_TASK(engine_task.mem_tracker());
     auto status = engine_task.execute();
     // Return result to fe
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index c50ac57ffe9..f6223affd07 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -36,10 +36,10 @@ class StorageEngine;
 class CloudStorageEngine;
 class Thread;
 class ThreadPool;
-class TMasterInfo;
 class TReportRequest;
 class TTabletInfo;
 class TAgentTaskRequest;
+class ClusterInfo;
 
 class TaskWorkerPoolIf {
 public:
@@ -109,7 +109,7 @@ private:
 
 class ReportWorker {
 public:
-    ReportWorker(std::string name, const TMasterInfo& master_info, int 
report_interval_s,
+    ReportWorker(std::string name, const ClusterInfo* cluster_info, int 
report_interval_s,
                  std::function<void()> callback);
 
     ~ReportWorker();
@@ -169,7 +169,7 @@ void alter_tablet_callback(StorageEngine& engine, const 
TAgentTaskRequest& req);
 
 void alter_cloud_tablet_callback(CloudStorageEngine& engine, const 
TAgentTaskRequest& req);
 
-void clone_callback(StorageEngine& engine, const TMasterInfo& master_info,
+void clone_callback(StorageEngine& engine, const ClusterInfo* cluster_info,
                     const TAgentTaskRequest& req);
 
 void storage_medium_migrate_callback(StorageEngine& engine, const 
TAgentTaskRequest& req);
@@ -182,15 +182,15 @@ void clean_udf_cache_callback(const TAgentTaskRequest& 
req);
 
 void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& 
req);
 
-void report_task_callback(const TMasterInfo& master_info);
+void report_task_callback(const ClusterInfo* cluster_info);
 
-void report_disk_callback(StorageEngine& engine, const TMasterInfo& 
master_info);
+void report_disk_callback(StorageEngine& engine, const ClusterInfo* 
cluster_info);
 
-void report_disk_callback(CloudStorageEngine& engine, const TMasterInfo& 
master_info);
+void report_disk_callback(CloudStorageEngine& engine, const ClusterInfo* 
cluster_info);
 
-void report_tablet_callback(StorageEngine& engine, const TMasterInfo& 
master_info);
+void report_tablet_callback(StorageEngine& engine, const ClusterInfo* 
cluster_info);
 
-void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo& 
master_info);
+void report_tablet_callback(CloudStorageEngine& engine, const ClusterInfo* 
cluster_info);
 
 void calc_delete_bitmap_callback(CloudStorageEngine& engine, const 
TAgentTaskRequest& req);
 
diff --git a/be/src/agent/utils.cpp b/be/src/agent/utils.cpp
index 71e7aedbc35..2a1f9994b01 100644
--- a/be/src/agent/utils.cpp
+++ b/be/src/agent/utils.cpp
@@ -43,6 +43,7 @@
 #include "common/config.h"
 #include "common/status.h"
 #include "runtime/client_cache.h"
+#include "runtime/cluster_info.h"
 
 namespace doris {
 class TConfirmUnusedRemoteFilesRequest;
@@ -61,8 +62,8 @@ namespace doris {
 static FrontendServiceClientCache s_client_cache;
 static std::unique_ptr<MasterServerClient> s_client;
 
-MasterServerClient* MasterServerClient::create(const TMasterInfo& master_info) 
{
-    s_client.reset(new MasterServerClient(master_info));
+MasterServerClient* MasterServerClient::create(const ClusterInfo* 
cluster_info) {
+    s_client.reset(new MasterServerClient(cluster_info));
     return s_client.get();
 }
 
@@ -70,18 +71,18 @@ MasterServerClient* MasterServerClient::instance() {
     return s_client.get();
 }
 
-MasterServerClient::MasterServerClient(const TMasterInfo& master_info)
-        : _master_info(master_info) {}
+MasterServerClient::MasterServerClient(const ClusterInfo* cluster_info)
+        : _cluster_info(cluster_info) {}
 
 Status MasterServerClient::finish_task(const TFinishTaskRequest& request, 
TMasterResult* result) {
     Status client_status;
-    FrontendServiceConnection client(&s_client_cache, 
_master_info.network_address,
+    FrontendServiceConnection client(&s_client_cache, 
_cluster_info->master_fe_addr,
                                      config::thrift_rpc_timeout_ms, 
&client_status);
 
     if (!client_status.ok()) {
         LOG(WARNING) << "fail to get master client from cache. "
-                     << "host=" << _master_info.network_address.hostname
-                     << ", port=" << _master_info.network_address.port
+                     << "host=" << _cluster_info->master_fe_addr.hostname
+                     << ", port=" << _cluster_info->master_fe_addr.port
                      << ", code=" << client_status.code();
         return Status::InternalError("Failed to get master client");
     }
@@ -97,8 +98,8 @@ Status MasterServerClient::finish_task(const 
TFinishTaskRequest& request, TMaste
             if (!client_status.ok()) {
 #ifdef ADDRESS_SANITIZER
                 LOG(WARNING) << "fail to get master client from cache. "
-                             << "host=" << 
_master_info.network_address.hostname
-                             << ", port=" << _master_info.network_address.port
+                             << "host=" << 
_cluster_info->master_fe_addr.hostname
+                             << ", port=" << _cluster_info->master_fe_addr.port
                              << ", code=" << client_status.code();
 #endif
                 return Status::RpcError("Master client finish task failed");
@@ -108,8 +109,8 @@ Status MasterServerClient::finish_task(const 
TFinishTaskRequest& request, TMaste
     } catch (std::exception& e) {
         RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms));
         LOG(WARNING) << "fail to finish_task. "
-                     << "host=" << _master_info.network_address.hostname
-                     << ", port=" << _master_info.network_address.port << ", 
error=" << e.what();
+                     << "host=" << _cluster_info->master_fe_addr.hostname
+                     << ", port=" << _cluster_info->master_fe_addr.port << ", 
error=" << e.what();
         return Status::InternalError("Fail to finish task");
     }
 
@@ -118,13 +119,13 @@ Status MasterServerClient::finish_task(const 
TFinishTaskRequest& request, TMaste
 
 Status MasterServerClient::report(const TReportRequest& request, 
TMasterResult* result) {
     Status client_status;
-    FrontendServiceConnection client(&s_client_cache, 
_master_info.network_address,
+    FrontendServiceConnection client(&s_client_cache, 
_cluster_info->master_fe_addr,
                                      config::thrift_rpc_timeout_ms, 
&client_status);
 
     if (!client_status.ok()) {
         LOG(WARNING) << "fail to get master client from cache. "
-                     << "host=" << _master_info.network_address.hostname
-                     << ", port=" << _master_info.network_address.port
+                     << "host=" << _cluster_info->master_fe_addr.hostname
+                     << ", port=" << _cluster_info->master_fe_addr.port
                      << ", code=" << client_status;
         return Status::InternalError("Fail to get master client from cache");
     }
@@ -144,8 +145,8 @@ Status MasterServerClient::report(const TReportRequest& 
request, TMasterResult*
                 if (!client_status.ok()) {
 #ifdef ADDRESS_SANITIZER
                     LOG(WARNING) << "fail to get master client from cache. "
-                                 << "host=" << 
_master_info.network_address.hostname
-                                 << ", port=" << 
_master_info.network_address.port
+                                 << "host=" << 
_cluster_info->master_fe_addr.hostname
+                                 << ", port=" << 
_cluster_info->master_fe_addr.port
                                  << ", code=" << client_status.code();
 #endif
                     return Status::InternalError("Fail to get master client 
from cache");
@@ -164,8 +165,8 @@ Status MasterServerClient::report(const TReportRequest& 
request, TMasterResult*
     } catch (std::exception& e) {
         RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms));
         LOG(WARNING) << "fail to report to master. "
-                     << "host=" << _master_info.network_address.hostname
-                     << ", port=" << _master_info.network_address.port
+                     << "host=" << _cluster_info->master_fe_addr.hostname
+                     << ", port=" << _cluster_info->master_fe_addr.port
                      << ", code=" << client_status.code() << ", reason=" << 
e.what();
         return Status::InternalError("Fail to report to master");
     }
@@ -176,13 +177,13 @@ Status MasterServerClient::report(const TReportRequest& 
request, TMasterResult*
 Status MasterServerClient::confirm_unused_remote_files(
         const TConfirmUnusedRemoteFilesRequest& request, 
TConfirmUnusedRemoteFilesResult* result) {
     Status client_status;
-    FrontendServiceConnection client(&s_client_cache, 
_master_info.network_address,
+    FrontendServiceConnection client(&s_client_cache, 
_cluster_info->master_fe_addr,
                                      config::thrift_rpc_timeout_ms, 
&client_status);
 
     if (!client_status.ok()) {
         return Status::InternalError(
                 "fail to get master client from cache. host={}, port={}, 
code={}",
-                _master_info.network_address.hostname, 
_master_info.network_address.port,
+                _cluster_info->master_fe_addr.hostname, 
_cluster_info->master_fe_addr.port,
                 client_status.code());
     }
     try {
@@ -198,8 +199,8 @@ Status MasterServerClient::confirm_unused_remote_files(
                 if (!client_status.ok()) {
                     return Status::InternalError(
                             "fail to get master client from cache. host={}, 
port={}, code={}",
-                            _master_info.network_address.hostname,
-                            _master_info.network_address.port, 
client_status.code());
+                            _cluster_info->master_fe_addr.hostname,
+                            _cluster_info->master_fe_addr.port, 
client_status.code());
                 }
 
                 client->confirmUnusedRemoteFiles(*result, request);
@@ -208,7 +209,7 @@ Status MasterServerClient::confirm_unused_remote_files(
                 // actually we don't care what FE returns.
                 return Status::InternalError(
                         "fail to confirm unused remote files. host={}, 
port={}, code={}, reason={}",
-                        _master_info.network_address.hostname, 
_master_info.network_address.port,
+                        _cluster_info->master_fe_addr.hostname, 
_cluster_info->master_fe_addr.port,
                         client_status.code(), e.what());
             }
         }
@@ -216,7 +217,7 @@ Status MasterServerClient::confirm_unused_remote_files(
         RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms));
         return Status::InternalError(
                 "fail to confirm unused remote files. host={}, port={}, 
code={}, reason={}",
-                _master_info.network_address.hostname, 
_master_info.network_address.port,
+                _cluster_info->master_fe_addr.hostname, 
_cluster_info->master_fe_addr.port,
                 client_status.code(), e.what());
     }
 
diff --git a/be/src/agent/utils.h b/be/src/agent/utils.h
index eea8f9f8cff..d554e92ff67 100644
--- a/be/src/agent/utils.h
+++ b/be/src/agent/utils.h
@@ -28,13 +28,13 @@ namespace doris {
 class TConfirmUnusedRemoteFilesRequest;
 class TConfirmUnusedRemoteFilesResult;
 class TFinishTaskRequest;
-class TMasterInfo;
 class TMasterResult;
 class TReportRequest;
+class ClusterInfo;
 
 class MasterServerClient {
 public:
-    static MasterServerClient* create(const TMasterInfo& master_info);
+    static MasterServerClient* create(const ClusterInfo* cluster_info);
     static MasterServerClient* instance();
 
     ~MasterServerClient() = default;
@@ -61,12 +61,12 @@ public:
                                        TConfirmUnusedRemoteFilesResult* 
result);
 
 private:
-    MasterServerClient(const TMasterInfo& master_info);
+    MasterServerClient(const ClusterInfo* cluster_info);
 
     DISALLOW_COPY_AND_ASSIGN(MasterServerClient);
 
-    // Not owner. Reference to the ExecEnv::_master_info
-    const TMasterInfo& _master_info;
+    // Not owner. Reference to the ExecEnv::_cluster_info
+    const ClusterInfo* _cluster_info;
 };
 
 class AgentUtils {
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 026610d5b0c..fefdb65e44b 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -837,7 +837,7 @@ static void send_stats_to_fe_async(const int64_t db_id, 
const int64_t txn_id,
                 Status status;
                 int64_t duration_ns = 0;
                 TNetworkAddress master_addr =
-                        ExecEnv::GetInstance()->master_info()->network_address;
+                        ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
                 if (master_addr.hostname.empty() || master_addr.port == 0) {
                     status = Status::Error<SERVICE_UNAVAILABLE>(
                             "Have not get FE Master heartbeat yet");
diff --git a/be/src/common/utils.h b/be/src/common/utils.h
index 46df44a40f2..92c5974e4a7 100644
--- a/be/src/common/utils.h
+++ b/be/src/common/utils.h
@@ -31,13 +31,14 @@ struct AuthInfo {
     std::string cluster;
     std::string user_ip;
     // -1 as unset
-    int64_t auth_code = -1;
+    int64_t auth_code = -1; // deprecated
     std::string token;
 };
 
 template <class T>
 void set_request_auth(T* req, const AuthInfo& auth) {
     req->user = auth.user; // always set user, because it may be used by FE
+    // auth code is deprecated and should be removed in 3.1
     if (auth.auth_code != -1) {
         // if auth_code is set, no need to set other info
         req->__set_auth_code(auth.auth_code);
diff --git a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp 
b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
index 6f66d9a8fd5..98051638026 100644
--- a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
@@ -51,7 +51,7 @@ Status SchemaActiveQueriesScanner::start(RuntimeState* state) 
{
 }
 
 Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {
-    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->master_info()->network_address;
+    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
 
     TSchemaTableRequestParams schema_table_params;
     for (int i = 0; i < _s_tbls_columns.size(); i++) {
diff --git a/be/src/exec/schema_scanner/schema_partitions_scanner.cpp 
b/be/src/exec/schema_scanner/schema_partitions_scanner.cpp
index ebe2bd3b70e..459715fd628 100644
--- a/be/src/exec/schema_scanner/schema_partitions_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_partitions_scanner.cpp
@@ -98,7 +98,7 @@ Status SchemaPartitionsScanner::start(RuntimeState* state) {
 }
 
 Status SchemaPartitionsScanner::get_onedb_info_from_fe(int64_t dbId) {
-    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->master_info()->network_address;
+    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
 
     TSchemaTableRequestParams schema_table_request_params;
     for (int i = 0; i < _s_tbls_columns.size(); i++) {
diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.cpp 
b/be/src/exec/schema_scanner/schema_routine_scanner.cpp
index e8d95f0abd6..8660d75e8a1 100644
--- a/be/src/exec/schema_scanner/schema_routine_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_routine_scanner.cpp
@@ -62,7 +62,7 @@ Status SchemaRoutinesScanner::start(RuntimeState* state) {
 }
 
 Status SchemaRoutinesScanner::get_block_from_fe() {
-    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->master_info()->network_address;
+    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
     TSchemaTableRequestParams schema_table_request_params;
     for (int i = 0; i < _s_tbls_columns.size(); i++) {
         schema_table_request_params.__isset.columns_name = true;
@@ -138,4 +138,4 @@ Status 
SchemaRoutinesScanner::get_next_block_internal(vectorized::Block* block,
     return Status::OK();
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/schema_scanner/schema_table_options_scanner.cpp 
b/be/src/exec/schema_scanner/schema_table_options_scanner.cpp
index f4b636be68f..bb778996a83 100644
--- a/be/src/exec/schema_scanner/schema_table_options_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_table_options_scanner.cpp
@@ -70,7 +70,7 @@ Status SchemaTableOptionsScanner::start(RuntimeState* state) {
 }
 
 Status SchemaTableOptionsScanner::get_onedb_info_from_fe(int64_t dbId) {
-    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->master_info()->network_address;
+    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
 
     TSchemaTableRequestParams schema_table_request_params;
     for (int i = 0; i < _s_tbls_columns.size(); i++) {
diff --git a/be/src/exec/schema_scanner/schema_table_properties_scanner.cpp 
b/be/src/exec/schema_scanner/schema_table_properties_scanner.cpp
index 749113da1b5..8d6a26a552f 100644
--- a/be/src/exec/schema_scanner/schema_table_properties_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_table_properties_scanner.cpp
@@ -67,7 +67,7 @@ Status SchemaTablePropertiesScanner::start(RuntimeState* 
state) {
 }
 
 Status SchemaTablePropertiesScanner::get_onedb_info_from_fe(int64_t dbId) {
-    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->master_info()->network_address;
+    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
 
     TSchemaTableRequestParams schema_table_request_params;
     for (int i = 0; i < _s_tbls_columns.size(); i++) {
diff --git a/be/src/exec/schema_scanner/schema_workload_group_privileges.cpp 
b/be/src/exec/schema_scanner/schema_workload_group_privileges.cpp
index a1d4568d905..a91a28322ec 100644
--- a/be/src/exec/schema_scanner/schema_workload_group_privileges.cpp
+++ b/be/src/exec/schema_scanner/schema_workload_group_privileges.cpp
@@ -45,7 +45,7 @@ Status 
SchemaWorkloadGroupPrivilegesScanner::start(RuntimeState* state) {
 }
 
 Status 
SchemaWorkloadGroupPrivilegesScanner::_get_workload_group_privs_block_from_fe() 
{
-    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->master_info()->network_address;
+    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
 
     TSchemaTableRequestParams schema_table_request_params;
     for (int i = 0; i < _s_tbls_columns.size(); i++) {
@@ -134,4 +134,4 @@ Status 
SchemaWorkloadGroupPrivilegesScanner::get_next_block_internal(vectorized:
     return Status::OK();
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp 
b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
index dd81a3ecb26..43562a8f52c 100644
--- a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
@@ -58,7 +58,7 @@ Status SchemaWorkloadGroupsScanner::start(RuntimeState* 
state) {
 }
 
 Status SchemaWorkloadGroupsScanner::_get_workload_groups_block_from_fe() {
-    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->master_info()->network_address;
+    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
 
     TSchemaTableRequestParams schema_table_request_params;
     for (int i = 0; i < _s_tbls_columns.size(); i++) {
@@ -144,4 +144,4 @@ Status 
SchemaWorkloadGroupsScanner::get_next_block_internal(vectorized::Block* b
     return Status::OK();
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git 
a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp 
b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
index 2d91f151f5f..5c6a6f70a88 100644
--- a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
@@ -49,7 +49,7 @@ Status 
SchemaWorkloadSchedulePolicyScanner::start(RuntimeState* state) {
 }
 
 Status 
SchemaWorkloadSchedulePolicyScanner::_get_workload_schedule_policy_block_from_fe()
 {
-    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->master_info()->network_address;
+    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
 
     TSchemaTableRequestParams schema_table_request_params;
     for (int i = 0; i < _s_tbls_columns.size(); i++) {
@@ -135,4 +135,4 @@ Status 
SchemaWorkloadSchedulePolicyScanner::get_next_block_internal(vectorized::
     return Status::OK();
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/http/action/compaction_score_action.cpp 
b/be/src/http/action/compaction_score_action.cpp
index 10b8cc6bdba..eeb0d0a642c 100644
--- a/be/src/http/action/compaction_score_action.cpp
+++ b/be/src/http/action/compaction_score_action.cpp
@@ -166,7 +166,7 @@ CompactionScoreAction::CompactionScoreAction(ExecEnv* 
exec_env, TPrivilegeHier::
           
_accessor(std::make_unique<CloudCompactionScoresAccessor>(tablet_mgr)) {}
 
 void CompactionScoreAction::handle(HttpRequest* req) {
-    req->add_output_header(HttpHeaders::CONTENT_TYPE, 
HttpHeaders::JsonType.data());
+    req->add_output_header(HttpHeaders::CONTENT_TYPE, 
HttpHeaders::JSON_TYPE.data());
     auto top_n_param = req->param(TOP_N);
 
     size_t top_n = DEFAULT_TOP_N;
diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index 4a34605aa33..99205368616 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -325,14 +325,14 @@ Status HttpStreamAction::process_put(HttpRequest* 
http_req,
             request.__set_group_commit_mode("sync_mode");
         }
     }
-    if (_exec_env->master_info()->__isset.backend_id) {
-        request.__set_backend_id(_exec_env->master_info()->backend_id);
+    if (_exec_env->cluster_info()->backend_id != 0) {
+        request.__set_backend_id(_exec_env->cluster_info()->backend_id);
     } else {
-        LOG(WARNING) << "_exec_env->master_info not set backend_id";
+        LOG(WARNING) << "_exec_env->cluster_info not set backend_id";
     }
 
     // plan this load
-    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
     int64_t stream_load_put_start_time = MonotonicNanos();
     RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
             master_addr.hostname, master_addr.port,
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index fa23e5e56c1..e3ad7f44866 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -666,7 +666,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
 
 #ifndef BE_TEST
     // plan this load
-    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
     int64_t stream_load_put_start_time = MonotonicNanos();
     RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
             master_addr.hostname, master_addr.port,
diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp
index bf1cd751ae3..c842a4fe2dd 100644
--- a/be/src/http/http_client.cpp
+++ b/be/src/http/http_client.cpp
@@ -26,6 +26,7 @@
 #include "common/config.h"
 #include "http/http_headers.h"
 #include "http/http_status.h"
+#include "runtime/exec_env.h"
 #include "util/stack_util.h"
 
 namespace doris {
@@ -141,6 +142,9 @@ Status HttpClient::init(const std::string& url, bool 
set_fail_on_error) {
         return Status::InternalError("fail to set CURLOPT_URL");
     }
 
+#ifndef BE_TEST
+    set_auth_token(ExecEnv::GetInstance()->cluster_info()->curr_auth_token);
+#endif
     return Status::OK();
 }
 
diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h
index f6a1a17ec29..fb692c50268 100644
--- a/be/src/http/http_client.h
+++ b/be/src/http/http_client.h
@@ -26,6 +26,7 @@
 #include <string>
 
 #include "common/status.h"
+#include "http/http_headers.h"
 #include "http/http_method.h"
 
 namespace doris {
@@ -55,6 +56,13 @@ public:
         curl_easy_setopt(_curl, CURLOPT_PASSWORD, passwd.c_str());
     }
 
+    // Auth-Token: xxxx
+    void set_auth_token(const std::string& token) {
+        std::string scratch_str = HttpHeaders::AUTH_TOKEN + ": " + token;
+        _header_list = curl_slist_append(_header_list, scratch_str.c_str());
+        curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _header_list);
+    }
+
     // content_type such as "application/json"
     void set_content_type(const std::string content_type) {
         std::string scratch_str = "Content-Type: " + content_type;
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index ec2dfc896e4..b9b5f0d85ae 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -66,7 +66,7 @@ static const std::string HTTP_TXN_OPERATION_KEY = 
"txn_operation";
 static const std::string HTTP_MEMTABLE_ON_SINKNODE = "memtable_on_sink_node";
 static const std::string HTTP_LOAD_STREAM_PER_NODE = "load_stream_per_node";
 static const std::string HTTP_WAL_ID_KY = "wal_id";
-static const std::string HTTP_AUTH_CODE = "auth_code";
+static const std::string HTTP_AUTH_CODE = "auth_code"; // deprecated
 static const std::string HTTP_GROUP_COMMIT = "group_commit";
 static const std::string HTTP_CLOUD_CLUSTER = "cloud_cluster";
 
diff --git a/be/src/http/http_handler_with_auth.cpp 
b/be/src/http/http_handler_with_auth.cpp
index 166638ab318..518b9868de1 100644
--- a/be/src/http/http_handler_with_auth.cpp
+++ b/be/src/http/http_handler_with_auth.cpp
@@ -52,6 +52,23 @@ int HttpHandlerWithAuth::on_header(HttpRequest* req) {
         return -1;
     }
 
+    // check auth by token
+    if (auth_info.token != "") {
+#ifdef BE_TEST
+        if (auth_info.token == "valid_token") {
+            return 0;
+#else
+        if (_exec_env->check_auth_token(auth_info.token)) {
+            return 0;
+#endif
+        } else {
+            LOG(WARNING) << "invalid auth token, request: " << 
req->debug_string();
+            HttpChannel::send_error(req, HttpStatus::BAD_REQUEST);
+            return -1;
+        }
+    }
+
+    // check auth by user/password
     auth_request.user = auth_info.user;
     auth_request.passwd = auth_info.passwd;
     auth_request.__set_cluster(auth_info.cluster);
@@ -65,7 +82,7 @@ int HttpHandlerWithAuth::on_header(HttpRequest* req) {
     }
 
 #ifndef BE_TEST
-    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
     {
         auto status = ThriftRpcHelper::rpc<FrontendServiceClient>(
                 master_addr.hostname, master_addr.port,
diff --git a/be/src/http/http_headers.cpp b/be/src/http/http_headers.cpp
index 79d9240c163..4fc5338145c 100644
--- a/be/src/http/http_headers.cpp
+++ b/be/src/http/http_headers.cpp
@@ -93,6 +93,7 @@ const char* HttpHeaders::WEBSOCKET_ORIGIN = 
"WebSocket-Origin";
 const char* HttpHeaders::WEBSOCKET_PROTOCOL = "WebSocket-Protocol";
 const char* HttpHeaders::WWW_AUTHENTICATE = "WWW-Authenticate";
 
-const std::string HttpHeaders::JsonType = "application/json";
+const std::string HttpHeaders::JSON_TYPE = "application/json";
+const std::string HttpHeaders::AUTH_TOKEN = "Auth-Token";
 
 } // namespace doris
diff --git a/be/src/http/http_headers.h b/be/src/http/http_headers.h
index e2f9547ac9d..0fdfc1be22c 100644
--- a/be/src/http/http_headers.h
+++ b/be/src/http/http_headers.h
@@ -97,7 +97,8 @@ public:
     static const char* WEBSOCKET_PROTOCOL;
     static const char* WWW_AUTHENTICATE;
 
-    static const std::string JsonType;
+    static const std::string JSON_TYPE;
+    static const std::string AUTH_TOKEN;
 };
 
 } // namespace doris
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index fbbc1cd93bf..f91610476b4 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -54,6 +54,8 @@ std::string encode_basic_auth(const std::string& user, const 
std::string& passwd
 }
 
 bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string* 
passwd) {
+    // const auto& token = req.header(HttpHeaders::AUTH_TOKEN);
+
     const char k_basic[] = "Basic ";
     const auto& auth = req.header(HttpHeaders::AUTHORIZATION);
     if (auth.compare(0, sizeof(k_basic) - 1, k_basic, sizeof(k_basic) - 1) != 
0) {
@@ -75,8 +77,11 @@ bool parse_basic_auth(const HttpRequest& req, std::string* 
user, std::string* pa
 }
 
 bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth) {
+    // deprecated, removed in 3.1, use AUTH_TOKEN
     const auto& token = req.header("token");
+    // deprecated, removed in 3.1, use AUTH_TOKEN
     const auto& auth_code = req.header(HTTP_AUTH_CODE);
+    const auto& auth_token = req.header(HttpHeaders::AUTH_TOKEN);
 
     std::tuple<std::string, std::string, std::string> tmp;
     auto& [user, pass, cluster] = tmp;
@@ -93,9 +98,11 @@ bool parse_basic_auth(const HttpRequest& req, AuthInfo* 
auth) {
     }
 
     if (!token.empty()) {
-        auth->token = token;
+        auth->token = token; // deprecated
+    } else if (!auth_token.empty()) {
+        auth->token = auth_token;
     } else if (!auth_code.empty()) {
-        auth->auth_code = std::stoll(auth_code);
+        auth->auth_code = std::stoll(auth_code); // deprecated
     } else if (!valid_basic_auth) {
         return false;
     }
diff --git a/be/src/io/fs/multi_table_pipe.cpp 
b/be/src/io/fs/multi_table_pipe.cpp
index 70ad64a81ce..b3af2531f15 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -197,7 +197,7 @@ Status MultiTablePipe::request_and_exec_plans() {
 
         // plan this load
         ExecEnv* exec_env = doris::ExecEnv::GetInstance();
-        TNetworkAddress master_addr = exec_env->master_info()->network_address;
+        TNetworkAddress master_addr = exec_env->cluster_info()->master_fe_addr;
         int64_t stream_load_put_start_time = MonotonicNanos();
         RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
                 master_addr.hostname, master_addr.port,
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 020d151d16b..96dc9295834 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -723,13 +723,13 @@ void StorageEngine::_update_replica_infos_callback() {
                    
!t->tablet_meta()->tablet_schema()->disable_auto_compaction() &&
                    
t->tablet_meta()->tablet_schema()->enable_single_replica_compaction();
         });
-        TMasterInfo* master_info = ExecEnv::GetInstance()->master_info();
-        if (master_info == nullptr) {
+        ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info();
+        if (cluster_info == nullptr) {
             LOG(WARNING) << "Have not get FE Master heartbeat yet";
             std::this_thread::sleep_for(std::chrono::seconds(2));
             continue;
         }
-        TNetworkAddress master_addr = master_info->network_address;
+        TNetworkAddress master_addr = cluster_info->master_fe_addr;
         if (master_addr.hostname == "" || master_addr.port == 0) {
             LOG(WARNING) << "Have not get FE Master heartbeat yet";
             std::this_thread::sleep_for(std::chrono::seconds(2));
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index 1fc5b7278c6..fc3a69fd5cd 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -143,13 +143,13 @@ Result<std::string> check_dest_binlog_valid(const 
std::string& tablet_dir,
     } while (false)
 
 EngineCloneTask::EngineCloneTask(StorageEngine& engine, const TCloneReq& 
clone_req,
-                                 const TMasterInfo& master_info, int64_t 
signature,
+                                 const ClusterInfo* cluster_info, int64_t 
signature,
                                  std::vector<TTabletInfo>* tablet_infos)
         : _engine(engine),
           _clone_req(clone_req),
           _tablet_infos(tablet_infos),
           _signature(signature),
-          _master_info(master_info) {
+          _cluster_info(cluster_info) {
     _mem_tracker = MemTrackerLimiter::create_shared(
             MemTrackerLimiter::Type::OTHER,
             "EngineCloneTask#tabletId=" + 
std::to_string(_clone_req.tablet_id));
@@ -356,7 +356,7 @@ Status 
EngineCloneTask::_make_and_download_snapshots(DataDir& data_dir,
                                                      bool* 
allow_incremental_clone) {
     Status status;
 
-    const auto& token = _master_info.token;
+    const auto& token = _cluster_info->token;
 
     int timeout_s = 0;
     if (_clone_req.__isset.timeout_s) {
diff --git a/be/src/olap/task/engine_clone_task.h 
b/be/src/olap/task/engine_clone_task.h
index 3161b803c82..9290ed9552e 100644
--- a/be/src/olap/task/engine_clone_task.h
+++ b/be/src/olap/task/engine_clone_task.h
@@ -31,11 +31,11 @@
 namespace doris {
 class DataDir;
 class TCloneReq;
-class TMasterInfo;
 class TTabletInfo;
 class Tablet;
 struct Version;
 class StorageEngine;
+class ClusterInfo;
 
 const std::string HTTP_REQUEST_PREFIX = "/api/_tablet/_download?";
 const std::string HTTP_REQUEST_TOKEN_PARAM = "token=";
@@ -51,7 +51,7 @@ public:
     Status execute() override;
 
     EngineCloneTask(StorageEngine& engine, const TCloneReq& clone_req,
-                    const TMasterInfo& master_info, int64_t signature,
+                    const ClusterInfo* cluster_info, int64_t signature,
                     std::vector<TTabletInfo>* tablet_infos);
     ~EngineCloneTask() override = default;
 
@@ -93,7 +93,7 @@ private:
     const TCloneReq& _clone_req;
     std::vector<TTabletInfo>* _tablet_infos = nullptr;
     int64_t _signature;
-    const TMasterInfo& _master_info;
+    const ClusterInfo* _cluster_info;
     int64_t _copy_size;
     int64_t _copy_time_ms;
     std::vector<PendingRowsetGuard> _pending_rs_guards;
diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp
index a7e33e7383f..03343603ae8 100644
--- a/be/src/olap/wal/wal_manager.cpp
+++ b/be/src/olap/wal/wal_manager.cpp
@@ -213,7 +213,7 @@ Status WalManager::create_wal_path(int64_t db_id, int64_t 
table_id, int64_t wal_
     base_path = _wal_dirs_info->get_available_random_wal_dir();
     std::stringstream ss;
     ss << base_path << "/" << std::to_string(db_id) << "/" << 
std::to_string(table_id) << "/"
-       << std::to_string(wal_version) << "_" << 
_exec_env->master_info()->backend_id << "_"
+       << std::to_string(wal_version) << "_" << 
_exec_env->cluster_info()->backend_id << "_"
        << std::to_string(wal_id) << "_" << label;
     {
         std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
@@ -377,8 +377,8 @@ Status WalManager::_replay_background() {
             break;
         }
         // port == 0 means not received heartbeat yet
-        if (_exec_env->master_info() != nullptr &&
-            _exec_env->master_info()->network_address.port == 0) {
+        if (_exec_env->cluster_info() != nullptr &&
+            _exec_env->cluster_info()->master_fe_addr.port == 0) {
             continue;
         }
         // replay residual wal,only replay once
diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index 69453054d18..84cf7afd4d3 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -161,12 +161,15 @@ bool WalTable::_need_replay(std::shared_ptr<WalInfo> 
wal_info) {
 
 Status WalTable::_try_abort_txn(int64_t db_id, std::string& label) {
     TLoadTxnRollbackRequest request;
-    request.__set_auth_code(0); // this is a fake, fe not check it now
+    // this is a fake, fe not check it now
+    // should be removed in 3.1, use token instead
+    request.__set_auth_code(0);
+    request.__set_token(_exec_env->cluster_info()->curr_auth_token);
     request.__set_db_id(db_id);
     request.__set_label(label);
     request.__set_reason("relay wal with label " + label);
     TLoadTxnRollbackResult result;
-    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
     auto st = ThriftRpcHelper::rpc<FrontendServiceClient>(
             master_addr.hostname, master_addr.port,
             [&request, &result](FrontendServiceConnection& client) {
@@ -235,7 +238,7 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const 
std::string& wal,
     ctx->wal_id = wal_id;
     ctx->label = label;
     ctx->need_commit_self = false;
-    ctx->auth.token = "relay_wal"; // this is a fake, fe not check it now
+    ctx->auth.token = _exec_env->cluster_info()->curr_auth_token;
     ctx->auth.user = "admin";
     ctx->group_commit = false;
     ctx->load_type = TLoadType::MANUL_LOAD;
@@ -245,6 +248,7 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const 
std::string& wal,
         // wait stream load finish
         RETURN_IF_ERROR(ctx->future.get());
         if (ctx->status.ok()) {
+            // deprecated and should be removed in 3.1, use token instead.
             ctx->auth.auth_code = wal_id;
             st = _exec_env->stream_load_executor()->commit_txn(ctx.get());
         } else {
@@ -293,7 +297,7 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t 
tb_id,
     request.__set_table_id(tb_id);
     TGetColumnInfoResult result;
     Status status;
-    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
     if (master_addr.hostname.empty() || master_addr.port == 0) {
         status = Status::InternalError<false>("Have not get FE Master 
heartbeat yet");
     } else {
@@ -327,4 +331,4 @@ Status WalTable::_read_wal_header(const std::string& 
wal_path, std::string& colu
     return Status::OK();
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/runtime/cluster_info.h b/be/src/runtime/cluster_info.h
new file mode 100644
index 00000000000..da66b4f0acf
--- /dev/null
+++ b/be/src/runtime/cluster_info.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/Types_types.h>
+
+#include <string>
+
+namespace doris {
+
+// This class is used to save the cluster info
+// like cluster id, epoch, cloud_unique_id, etc.
+// These info are usually in heartbeat from Master FE.
+class ClusterInfo {
+public:
+    // Unique cluster id
+    int32_t cluster_id = 0;
+    // Master FE addr: ip:rpc_port
+    TNetworkAddress master_fe_addr;
+    // Master FE http_port
+    int32_t master_fe_http_port = 0;
+    // Unique cluster token
+    std::string token = "";
+    // Backend ID
+    int64_t backend_id = 0;
+
+    // Auth token for internal authentication
+    // Save the last 2 tokens to avoid token invalid during token update
+    std::string curr_auth_token = "";
+    std::string last_auth_token = "";
+};
+
+} // namespace doris
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 872069ee70a..ab24d7ca192 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -65,12 +65,16 @@ Result<BaseTabletSPtr> ExecEnv::get_tablet(int64_t 
tablet_id) {
 }
 
 const std::string& ExecEnv::token() const {
-    return _master_info->token;
+    return _cluster_info->token;
 }
 
-std::map<TNetworkAddress, FrontendInfo> ExecEnv::get_frontends() {
+std::vector<TFrontendInfo> ExecEnv::get_frontends() {
     std::lock_guard<std::mutex> lg(_frontends_lock);
-    return _frontends;
+    std::vector<TFrontendInfo> infos;
+    for (const auto& cur_fe : _frontends) {
+        infos.push_back(cur_fe.second.info);
+    }
+    return infos;
 }
 
 void ExecEnv::update_frontends(const std::vector<TFrontendInfo>& new_fe_infos) 
{
@@ -173,4 +177,9 @@ void ExecEnv::wait_for_all_tasks_done() {
     }
 }
 
+bool ExecEnv::check_auth_token(const std::string& auth_token) {
+    return _cluster_info->curr_auth_token == auth_token ||
+           _cluster_info->last_auth_token == auth_token;
+}
+
 } // namespace doris
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 307c2586fc6..4a0000fa19f 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -33,6 +33,7 @@
 #include "olap/rowset/segment_v2/inverted_index_writer.h"
 #include "olap/tablet_fwd.h"
 #include "pipeline/pipeline_tracing.h"
+#include "runtime/cluster_info.h"
 #include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove 
this include header
 #include "util/threadpool.h"
 
@@ -83,7 +84,6 @@ class BaseStorageEngine;
 class ResultBufferMgr;
 class ResultQueueMgr;
 class RuntimeQueryStatisticsMgr;
-class TMasterInfo;
 class LoadChannelMgr;
 class LoadStreamMgr;
 class LoadStreamMapPool;
@@ -219,7 +219,7 @@ public:
     UserFunctionCache* user_function_cache() { return _user_function_cache; }
     FragmentMgr* fragment_mgr() { return _fragment_mgr; }
     ResultCache* result_cache() { return _result_cache; }
-    TMasterInfo* master_info() { return _master_info; }
+    ClusterInfo* cluster_info() { return _cluster_info; }
     LoadPathMgr* load_path_mgr() { return _load_path_mgr; }
     BfdParser* bfd_parser() const { return _bfd_parser; }
     BrokerMgr* broker_mgr() const { return _broker_mgr; }
@@ -262,7 +262,7 @@ public:
     void set_memtable_memory_limiter(MemTableMemoryLimiter* limiter) {
         _memtable_memory_limiter.reset(limiter);
     }
-    void set_master_info(TMasterInfo* master_info) { this->_master_info = 
master_info; }
+    void set_cluster_info(ClusterInfo* cluster_info) { this->_cluster_info = 
cluster_info; }
     void set_new_load_stream_mgr(std::shared_ptr<NewLoadStreamMgr> 
new_load_stream_mgr) {
         this->_new_load_stream_mgr = new_load_stream_mgr;
     }
@@ -299,7 +299,7 @@ public:
     void wait_for_all_tasks_done();
 
     void update_frontends(const std::vector<TFrontendInfo>& new_infos);
-    std::map<TNetworkAddress, FrontendInfo> get_frontends();
+    std::vector<TFrontendInfo> get_frontends();
     std::map<TNetworkAddress, FrontendInfo> get_running_frontends();
 
     TabletSchemaCache* get_tablet_schema_cache() { return 
_tablet_schema_cache; }
@@ -333,6 +333,8 @@ public:
     orc::MemoryPool* orc_memory_pool() { return _orc_memory_pool; }
     arrow::MemoryPool* arrow_memory_pool() { return _arrow_memory_pool; }
 
+    bool check_auth_token(const std::string& auth_token);
+
 private:
     ExecEnv();
 
@@ -403,7 +405,7 @@ private:
     WorkloadGroupMgr* _workload_group_manager = nullptr;
 
     ResultCache* _result_cache = nullptr;
-    TMasterInfo* _master_info = nullptr;
+    ClusterInfo* _cluster_info = nullptr;
     LoadPathMgr* _load_path_mgr = nullptr;
 
     BfdParser* _bfd_parser = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 062069044dc..9d761786611 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -278,7 +278,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     _fragment_mgr = new FragmentMgr(this);
     _result_cache = new ResultCache(config::query_cache_max_size_mb,
                                     config::query_cache_elasticity_size_mb);
-    _master_info = new TMasterInfo();
+    _cluster_info = new ClusterInfo();
     _load_path_mgr = new LoadPathMgr(this);
     _bfd_parser = BfdParser::create();
     _broker_mgr = new BrokerMgr(this);
@@ -759,9 +759,9 @@ void ExecEnv::destroy() {
 
     // Master Info is a thrift object, it could be the last one to deconstruct.
     // Master info should be deconstruct later than fragment manager, because 
fragment will
-    // access master_info.backend id to access some info. If there is a 
running query and master
+    // access cluster_info.backend_id to access some info. If there is a 
running query and master
     // info is deconstructed then BE process will core at coordinator back 
method in fragment mgr.
-    SAFE_DELETE(_master_info);
+    SAFE_DELETE(_cluster_info);
 
     // NOTE: runtime query statistics mgr could be visited by query and daemon 
thread
     // so it should be created before all query begin and deleted after all 
query and daemon thread stoppped
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e683b84e2b4..86a3a8e773d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -526,8 +526,8 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
     req.runtime_state->get_unreported_errors(&(params.error_log));
     params.__isset.error_log = (!params.error_log.empty());
 
-    if (_exec_env->master_info()->__isset.backend_id) {
-        params.__set_backend_id(_exec_env->master_info()->backend_id);
+    if (_exec_env->cluster_info()->backend_id != 0) {
+        params.__set_backend_id(_exec_env->cluster_info()->backend_id);
     }
 
     TReportExecStatusResult res;
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index cd54718bc5f..f06f26b6418 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -351,12 +351,12 @@ Status GroupCommitTable::_create_group_commit_load(int 
be_exe_version,
         request.__set_strictMode(false);
         // this is an internal interface, use admin to pass the auth check
         request.__set_user("admin");
-        if (_exec_env->master_info()->__isset.backend_id) {
-            request.__set_backend_id(_exec_env->master_info()->backend_id);
+        if (_exec_env->cluster_info()->backend_id != 0) {
+            request.__set_backend_id(_exec_env->cluster_info()->backend_id);
         } else {
-            LOG(WARNING) << "_exec_env->master_info not set backend_id";
+            LOG(WARNING) << "_exec_env->cluster_info not set backend_id";
         }
-        TNetworkAddress master_addr = 
_exec_env->master_info()->network_address;
+        TNetworkAddress master_addr = 
_exec_env->cluster_info()->master_fe_addr;
         st = ThriftRpcHelper::rpc<FrontendServiceClient>(
                 master_addr.hostname, master_addr.port,
                 [&result, &request](FrontendServiceConnection& client) {
@@ -440,23 +440,25 @@ Status 
GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
                         { status = Status::InternalError(""); });
         // commit txn
         TLoadTxnCommitRequest request;
-        request.__set_auth_code(0); // this is a fake, fe not check it now
+        // deprecated and should be removed in 3.1, use token instead
+        request.__set_auth_code(0);
+        request.__set_token(_exec_env->cluster_info()->curr_auth_token);
         request.__set_db_id(db_id);
         request.__set_table_id(table_id);
         request.__set_txnId(txn_id);
         request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms);
         request.__set_groupCommit(true);
         request.__set_receiveBytes(state->num_bytes_load_total());
-        if (_exec_env->master_info()->__isset.backend_id) {
-            request.__set_backendId(_exec_env->master_info()->backend_id);
+        if (_exec_env->cluster_info()->backend_id != 0) {
+            request.__set_backendId(_exec_env->cluster_info()->backend_id);
         } else {
-            LOG(WARNING) << "_exec_env->master_info not set backend_id";
+            LOG(WARNING) << "_exec_env->cluster_info not set backend_id";
         }
         if (state) {
             request.__set_commitInfos(state->tablet_commit_infos());
         }
         TLoadTxnCommitResult result;
-        TNetworkAddress master_addr = 
_exec_env->master_info()->network_address;
+        TNetworkAddress master_addr = 
_exec_env->cluster_info()->master_fe_addr;
         int retry_times = 0;
         while (retry_times < config::mow_stream_load_commit_retry_times) {
             st = ThriftRpcHelper::rpc<FrontendServiceClient>(
@@ -482,12 +484,14 @@ Status 
GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
     } else {
         // abort txn
         TLoadTxnRollbackRequest request;
-        request.__set_auth_code(0); // this is a fake, fe not check it now
+        // deprecated and should be removed in 3.1, use token instead
+        request.__set_auth_code(0);
+        request.__set_token(_exec_env->cluster_info()->curr_auth_token);
         request.__set_db_id(db_id);
         request.__set_txnId(txn_id);
         request.__set_reason(status.to_string());
         TLoadTxnRollbackResult result;
-        TNetworkAddress master_addr = 
_exec_env->master_info()->network_address;
+        TNetworkAddress master_addr = 
_exec_env->cluster_info()->master_fe_addr;
         st = ThriftRpcHelper::rpc<FrontendServiceClient>(
                 master_addr.hostname, master_addr.port,
                 [&request, &result](FrontendServiceConnection& client) {
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 2c69b8a5870..06150ae3d20 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -230,7 +230,9 @@ Status RoutineLoadTaskExecutor::submit_task(const 
TRoutineLoadTask& task) {
     ctx->db = task.db;
     ctx->table = task.tbl;
     ctx->label = task.label;
+    // deprecated, removed in 3.1, use auth token instead.
     ctx->auth.auth_code = task.auth_code;
+    ctx->auth.token = _exec_env->cluster_info()->curr_auth_token;
 
     if (task.__isset.max_interval_s) {
         ctx->max_interval_s = task.max_interval_s;
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 77fd80cd528..f07b308850e 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -162,7 +162,7 @@ TReportExecStatusParams 
RuntimeQueryStatisticsMgr::create_report_exec_status_par
 
     TReportExecStatusParams req;
     THRIFT_MOVE_VALUES(req, query_profile, profile);
-    req.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
+    req.__set_backend_id(ExecEnv::GetInstance()->cluster_info()->backend_id);
     // invalid query id to avoid API compatibility during upgrade
     req.__set_query_id(TUniqueId());
     req.__set_done(is_done);
@@ -341,7 +341,7 @@ void 
RuntimeQueryStatisticsMgr::register_query_statistics(std::string query_id,
 }
 
 void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
-    int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
+    int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id;
     // 1 get query statistics map
     std::map<TNetworkAddress, std::map<std::string, TQueryStatistics>> 
fe_qs_map;
     std::map<std::string, std::pair<bool, bool>> qs_status; // <finished, 
timeout>
@@ -515,7 +515,7 @@ void 
RuntimeQueryStatisticsMgr::set_workload_group_id(std::string query_id, int6
 
 void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* 
block) {
     std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
-    int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
+    int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id;
 
     // block's schema come from 
SchemaBackendActiveTasksScanner::_s_tbls_columns
     for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
diff --git a/be/src/runtime/small_file_mgr.cpp 
b/be/src/runtime/small_file_mgr.cpp
index 19a05f9c9dc..15d2c937be2 100644
--- a/be/src/runtime/small_file_mgr.cpp
+++ b/be/src/runtime/small_file_mgr.cpp
@@ -169,10 +169,10 @@ Status SmallFileMgr::_download_file(int64_t file_id, 
const std::string& md5,
     HttpClient client;
 
     std::stringstream url_ss;
-    TMasterInfo* master_info = _exec_env->master_info();
-    url_ss << master_info->network_address.hostname << ":" << 
master_info->http_port
+    ClusterInfo* cluster_info = _exec_env->cluster_info();
+    url_ss << cluster_info->master_fe_addr.hostname << ":" << 
cluster_info->master_fe_http_port
            << "/api/get_small_file?"
-           << "file_id=" << file_id << "&token=" << master_info->token;
+           << "file_id=" << file_id << "&token=" << cluster_info->token;
 
     std::string url = url_ss.str();
 
diff --git a/be/src/runtime/snapshot_loader.cpp 
b/be/src/runtime/snapshot_loader.cpp
index d04a5463879..784904c78a3 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -942,7 +942,7 @@ Status SnapshotLoader::_report_every(int report_threshold, 
int* counter, int32_t
     LOG(INFO) << "report to frontend. job id: " << _job_id << ", task id: " << 
_task_id
               << ", finished num: " << finished_num << ", total num:" << 
total_num;
 
-    TNetworkAddress master_addr = _env->master_info()->network_address;
+    TNetworkAddress master_addr = _env->cluster_info()->master_fe_addr;
 
     TSnapshotLoaderReportRequest request;
     request.job_id = _job_id;
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index ec83141893a..482fadac44e 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -173,12 +173,12 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* 
ctx) {
         request.__set_timeout(ctx->timeout_second);
     }
     request.__set_request_id(ctx->id.to_thrift());
-    request.__set_backend_id(_exec_env->master_info()->backend_id);
+    request.__set_backend_id(_exec_env->cluster_info()->backend_id);
 
     TLoadTxnBeginResult result;
     Status status;
     int64_t duration_ns = 0;
-    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
     if (master_addr.hostname.empty() || master_addr.port == 0) {
         status = Status::Error<SERVICE_UNAVAILABLE>("Have not get FE Master 
heartbeat yet");
     } else {
@@ -215,7 +215,7 @@ Status 
StreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) {
     TLoadTxnCommitRequest request;
     get_commit_request(ctx, request);
 
-    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
     TLoadTxnCommitResult result;
     int64_t duration_ns = 0;
     {
@@ -260,7 +260,7 @@ Status 
StreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
         request.__set_txnId(ctx->txn_id);
     }
 
-    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
     TLoadTxn2PCResult result;
     int64_t duration_ns = 0;
     {
@@ -312,7 +312,7 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* 
ctx) {
     TLoadTxnCommitRequest request;
     get_commit_request(ctx, request);
 
-    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
     TLoadTxnCommitResult result;
 #ifndef BE_TEST
     RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
@@ -344,7 +344,7 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* 
ctx) {
 void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
     
DorisMetrics::instance()->stream_load_txn_rollback_request_total->increment(1);
 
-    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
     TLoadTxnRollbackRequest request;
     set_request_auth(&request, ctx->auth);
     request.__set_db(ctx->db);
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 003f07f1db0..927d4d13814 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -254,7 +254,7 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
 }
 
 void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
-    int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
+    int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id;
     int cpu_num = CpuInfo::num_cores();
     cpu_num = cpu_num <= 0 ? 1 : cpu_num;
     uint64_t total_cpu_time_ns_per_second = cpu_num * 1000000000ll;
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index e6fdfaa8765..abdef513296 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -599,7 +599,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* 
arg) {
 } // namespace
 
 BaseBackendService::BaseBackendService(ExecEnv* exec_env)
-        : _exec_env(exec_env), _agent_server(new AgentServer(exec_env, 
*exec_env->master_info())) {}
+        : _exec_env(exec_env), _agent_server(new AgentServer(exec_env, 
exec_env->cluster_info())) {}
 
 BaseBackendService::~BaseBackendService() = default;
 
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index dcc76259868..00935e8fc64 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -83,9 +83,7 @@
 #include "util/thrift_server.h"
 #include "util/uid_util.h"
 
-namespace doris {
-class TMasterInfo;
-} // namespace doris
+namespace doris {} // namespace doris
 
 static void help(const char*);
 
@@ -579,11 +577,11 @@ int main(int argc, char** argv) {
     stop_work_if_error(status, "Doris Be http service did not start correctly, 
exiting");
 
     // 4. heart beat server
-    doris::TMasterInfo* master_info = exec_env->master_info();
+    doris::ClusterInfo* cluster_info = exec_env->cluster_info();
     std::unique_ptr<doris::ThriftServer> heartbeat_thrift_server;
     doris::Status heartbeat_status = doris::create_heartbeat_server(
             exec_env, doris::config::heartbeat_service_port, 
&heartbeat_thrift_server,
-            doris::config::heartbeat_service_thread_count, master_info);
+            doris::config::heartbeat_service_thread_count, cluster_info);
 
     stop_work_if_error(heartbeat_status, "Heartbeat services did not start 
correctly, exiting");
 
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 89b43ec5223..ae84081813f 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -500,7 +500,7 @@ Status PInternalService::_exec_plan_fragment_impl(
         const std::function<void(RuntimeState*, Status*)>& cb) {
     // Sometimes the BE do not receive the first heartbeat message and it 
receives request from FE
     // If BE execute this fragment, it will core when it wants to get some 
property from master info.
-    if (ExecEnv::GetInstance()->master_info() == nullptr) {
+    if (ExecEnv::GetInstance()->cluster_info() == nullptr) {
         return Status::InternalError(
                 "Have not receive the first heartbeat message from master, not 
ready to provide "
                 "service");
diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp 
b/be/src/vec/exec/scan/vmeta_scanner.cpp
index 180d18b0c2c..289930b16bc 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.cpp
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -278,7 +278,7 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& 
meta_scan_range) {
 
     // _state->execution_timeout() is seconds, change to milliseconds
     int time_out = _state->execution_timeout() * 1000;
-    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->master_info()->network_address;
+    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
     TFetchSchemaTableDataResult result;
     RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
             master_addr.hostname, master_addr.port,
diff --git a/be/src/vec/sink/autoinc_buffer.cpp 
b/be/src/vec/sink/autoinc_buffer.cpp
index 4bc87dff489..80ce9d494d5 100644
--- a/be/src/vec/sink/autoinc_buffer.cpp
+++ b/be/src/vec/sink/autoinc_buffer.cpp
@@ -46,7 +46,7 @@ void AutoIncIDBuffer::set_batch_size_at_least(size_t 
batch_size) {
 Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
     constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3;
     _rpc_status = Status::OK();
-    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->master_info()->network_address;
+    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
     for (uint32_t retry_times = 0; retry_times < 
FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) {
         TAutoIncrementRangeRequest request;
         TAutoIncrementRangeResult result;
@@ -167,4 +167,4 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t 
length) {
     return Status::OK();
 }
 
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/vrow_distribution.cpp 
b/be/src/vec/sink/vrow_distribution.cpp
index 74a2830a191..2de21edd80b 100644
--- a/be/src/vec/sink/vrow_distribution.cpp
+++ b/be/src/vec/sink/vrow_distribution.cpp
@@ -102,7 +102,7 @@ Status VRowDistribution::automatic_create_partition() {
     request.__set_be_endpoint(be_endpoint);
 
     VLOG_NOTICE << "automatic partition rpc begin request " << request;
-    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->master_info()->network_address;
+    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
     int time_out = _state->execution_timeout() * 1000;
     RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
             master_addr.hostname, master_addr.port,
@@ -175,7 +175,7 @@ Status VRowDistribution::_replace_overwriting_partition() {
     request.__set_be_endpoint(be_endpoint);
 
     VLOG_NOTICE << "auto detect replace partition request: " << request;
-    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->master_info()->network_address;
+    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
     int time_out = _state->execution_timeout() * 1000;
     RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
             master_addr.hostname, master_addr.port,
diff --git a/be/test/agent/heartbeat_server_test.cpp 
b/be/test/agent/heartbeat_server_test.cpp
index db3a7a2eb78..f0c93a8caca 100644
--- a/be/test/agent/heartbeat_server_test.cpp
+++ b/be/test/agent/heartbeat_server_test.cpp
@@ -35,12 +35,12 @@ namespace doris {
 TEST(HeartbeatTest, TestHeartbeat) {
     setenv("DORIS_HOME", "./", 1);
     THeartbeatResult heartbeat_result;
-    TMasterInfo ori_master_info;
-    ori_master_info.cluster_id = -1;
-    ori_master_info.epoch = 0;
-    ori_master_info.network_address.hostname = "";
-    ori_master_info.network_address.port = 0;
-    HeartbeatServer heartbeat_server(&ori_master_info);
+    ClusterInfo ori_cluster_info;
+    ori_cluster_info.cluster_id = -1;
+    ori_cluster_info.epoch = 0;
+    ori_cluster_info.network_address.hostname = "";
+    ori_cluster_info.network_address.port = 0;
+    HeartbeatServer heartbeat_server(&ori_cluster_info);
 
     heartbeat_server.heartbeat(heartbeat_result, master_info);
     EXPECT_EQ(TStatusCode::OK, heartbeat_result.status.status_code);
diff --git a/be/test/agent/mock_utils.h b/be/test/agent/mock_utils.h
index 3a14ca79626..b49ff3e372a 100644
--- a/be/test/agent/mock_utils.h
+++ b/be/test/agent/mock_utils.h
@@ -33,7 +33,7 @@ public:
 
 class MockMasterServerClient : public MasterServerClient {
 public:
-    MockMasterServerClient(const TMasterInfo& master_info,
+    MockMasterServerClient(const ClusterInfo* cluster_info,
                            FrontendServiceClientCache* client_cache);
     MOCK_METHOD2(finish_task, Status(const TFinishTaskRequest request, 
TMasterResult* result));
     MOCK_METHOD2(report, Status(const TReportRequest request, TMasterResult* 
result));
diff --git a/be/test/agent/task_worker_pool_test.cpp 
b/be/test/agent/task_worker_pool_test.cpp
index 0c7fd88396b..7be29e6feb4 100644
--- a/be/test/agent/task_worker_pool_test.cpp
+++ b/be/test/agent/task_worker_pool_test.cpp
@@ -27,6 +27,7 @@
 
 #include "olap/options.h"
 #include "olap/storage_engine.h"
+#include "runtime/cluster_info.h"
 
 namespace doris {
 
@@ -106,14 +107,14 @@ TEST(TaskWorkerPoolTest, ReportWorkerPool) {
     
ExecEnv::GetInstance()->set_storage_engine(std::make_unique<StorageEngine>(EngineOptions
 {}));
     Defer defer {[] { ExecEnv::GetInstance()->set_storage_engine(nullptr); }};
 
-    TMasterInfo master_info;
+    ClusterInfo cluster_info;
     std::atomic_int count {0};
-    ReportWorker worker("test", master_info, 1, [&] { ++count; });
+    ReportWorker worker("test", &cluster_info, 1, [&] { ++count; });
 
     worker.notify(); // Not received heartbeat yet, ignore
     std::this_thread::sleep_for(100ms);
 
-    master_info.network_address.__set_port(9030);
+    cluster_info.master_fe_addr.__set_port(9030);
     worker.notify();
     std::this_thread::sleep_for(100ms);
     EXPECT_EQ(count.load(), 1);
diff --git a/be/test/http/http_client_test.cpp 
b/be/test/http/http_client_test.cpp
index 4e2ada03c66..d42e0a6775e 100644
--- a/be/test/http/http_client_test.cpp
+++ b/be/test/http/http_client_test.cpp
@@ -140,7 +140,7 @@ TEST_F(HttpClientTest, get_normal) {
     client.set_basic_auth("test1", "");
     std::string response;
     st = client.execute(&response);
-    EXPECT_TRUE(st.ok());
+    EXPECT_TRUE(st.ok()) << st;
     EXPECT_STREQ("test1", response.c_str());
 
     // for head
@@ -380,7 +380,7 @@ TEST_F(HttpClientTest, enable_http_auth) {
         std::cout << "st = " << st << "\n";
         std::cout << "response = " << response << "\n";
         std::cout << "st.msg() = " << st.msg() << "\n";
-        EXPECT_TRUE(!st.ok());
+        EXPECT_TRUE(!st.ok()) << st;
         EXPECT_TRUE(st.msg().find("The requested URL returned error") != 
std::string::npos);
     }
 
@@ -477,6 +477,36 @@ TEST_F(HttpClientTest, enable_http_auth) {
         EXPECT_TRUE(st.msg().find("Operation timed out after") != 
std::string::npos);
     }
 
+    // valid token
+    {
+        config::enable_all_http_auth = true;
+        std::string url = hostname + "/metrics";
+        HttpClient client;
+        auto st = client.init(url);
+        EXPECT_TRUE(st.ok());
+        client.set_method(GET);
+        client.set_auth_token("valid_token");
+        client.set_timeout_ms(200);
+        std::string response;
+        st = client.execute(&response);
+        EXPECT_TRUE(st.ok()) << st;
+    }
+
+    // invalid token
+    {
+        config::enable_all_http_auth = true;
+        std::string url = hostname + "/metrics";
+        HttpClient client;
+        auto st = client.init(url);
+        EXPECT_TRUE(st.ok());
+        client.set_method(GET);
+        client.set_auth_token("invalid_token");
+        client.set_timeout_ms(200);
+        std::string response;
+        st = client.execute(&response);
+        EXPECT_TRUE(!st.ok()) << st;
+    }
+
     {
         config::enable_all_http_auth = true;
         std::string url = hostname + "/api/glog/adjust";
diff --git a/be/test/olap/wal/wal_manager_test.cpp 
b/be/test/olap/wal/wal_manager_test.cpp
index afb4c7696ef..32162593fc0 100644
--- a/be/test/olap/wal/wal_manager_test.cpp
+++ b/be/test/olap/wal/wal_manager_test.cpp
@@ -55,10 +55,10 @@ public:
     void SetUp() override {
         prepare();
         _env = ExecEnv::GetInstance();
-        _env->_master_info = new TMasterInfo();
-        _env->_master_info->network_address.hostname = "host name";
-        _env->_master_info->network_address.port = 1234;
-        _env->_master_info->backend_id = 1001;
+        _env->_cluster_info = new ClusterInfo();
+        _env->_cluster_info->master_fe_addr.hostname = "host name";
+        _env->_cluster_info->master_fe_addr.port = 1234;
+        _env->_cluster_info->backend_id = 1001;
         _env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared();
         _env->_internal_client_cache = new 
BrpcClientCache<PBackendService_Stub>();
         _env->_function_client_cache = new 
BrpcClientCache<PFunctionService_Stub>();
@@ -77,7 +77,7 @@ public:
         SAFE_STOP(_env->_wal_manager);
         SAFE_DELETE(_env->_function_client_cache);
         SAFE_DELETE(_env->_internal_client_cache);
-        SAFE_DELETE(_env->_master_info);
+        SAFE_DELETE(_env->_cluster_info);
     }
 
     void prepare() {
@@ -242,4 +242,4 @@ TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) {
     EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), 
Status::InternalError(""));
     EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, available_bytes);
 }
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/test/runtime/routine_load_task_executor_test.cpp 
b/be/test/runtime/routine_load_task_executor_test.cpp
index 338b82c6eba..5c2b39bce1f 100644
--- a/be/test/runtime/routine_load_task_executor_test.cpp
+++ b/be/test/runtime/routine_load_task_executor_test.cpp
@@ -55,7 +55,7 @@ public:
         k_stream_load_rollback_result = TLoadTxnRollbackResult();
         k_stream_load_put_result = TStreamLoadPutResult();
 
-        _env.set_master_info(new TMasterInfo());
+        _env.set_cluster_info(new ClusterInfo());
         _env.set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
         
_env.set_stream_load_executor(StreamLoadExecutor::create_unique(&_env));
 
@@ -63,7 +63,7 @@ public:
         config::max_consumer_num_per_group = 3;
     }
 
-    void TearDown() override { delete _env.master_info(); }
+    void TearDown() override { delete _env.cluster_info(); }
 
     ExecEnv _env;
 };
@@ -121,4 +121,4 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) {
     executor.stop();
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/test/vec/exec/vfile_scanner_exception_test.cpp 
b/be/test/vec/exec/vfile_scanner_exception_test.cpp
index 1d565c7e0ce..4b6ce46bd88 100644
--- a/be/test/vec/exec/vfile_scanner_exception_test.cpp
+++ b/be/test/vec/exec/vfile_scanner_exception_test.cpp
@@ -26,6 +26,7 @@
 #include "io/fs/local_file_system.h"
 #include "olap/wal/wal_manager.h"
 #include "pipeline/exec/file_scan_operator.h"
+#include "runtime/cluster_info.h"
 #include "runtime/descriptors.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/runtime_state.h"
@@ -107,7 +108,7 @@ private:
     TFileRangeDesc _range_desc;
     TFileScanRange _scan_range;
     std::unique_ptr<ShardedKVCache> _kv_cache = nullptr;
-    std::unique_ptr<TMasterInfo> _master_info = nullptr;
+    std::unique_ptr<ClusterInfo> _cluster_info = nullptr;
 };
 
 void VfileScannerExceptionTest::_init_desc_table() {
@@ -266,12 +267,12 @@ void VfileScannerExceptionTest::init() {
     _scan_range.params.format_type = TFileFormatType::FORMAT_JNI;
     _kv_cache.reset(new ShardedKVCache(48));
 
-    _master_info.reset(new TMasterInfo());
+    _cluster_info.reset(new ClusterInfo());
     _env = ExecEnv::GetInstance();
-    _env->_master_info = _master_info.get();
-    _env->_master_info->network_address.hostname = "host name";
-    _env->_master_info->network_address.port = _backend_id;
-    _env->_master_info->backend_id = 1001;
+    _env->_cluster_info = _cluster_info.get();
+    _env->_cluster_info->master_fe_addr.hostname = "host name";
+    _env->_cluster_info->master_fe_addr.port = _backend_id;
+    _env->_cluster_info->backend_id = 1001;
     _env->_wal_manager = 0;
 }
 
diff --git a/be/test/vec/exec/vwal_scanner_test.cpp 
b/be/test/vec/exec/vwal_scanner_test.cpp
index 0944193e6d0..5c4056a8c24 100644
--- a/be/test/vec/exec/vwal_scanner_test.cpp
+++ b/be/test/vec/exec/vwal_scanner_test.cpp
@@ -26,6 +26,7 @@
 #include "io/fs/local_file_system.h"
 #include "olap/wal/wal_manager.h"
 #include "pipeline/exec/file_scan_operator.h"
+#include "runtime/cluster_info.h"
 #include "runtime/descriptors.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/runtime_state.h"
@@ -116,7 +117,7 @@ private:
     TFileRangeDesc _range_desc;
     TFileScanRange _scan_range;
     std::unique_ptr<ShardedKVCache> _kv_cache = nullptr;
-    std::unique_ptr<TMasterInfo> _master_info = nullptr;
+    std::unique_ptr<ClusterInfo> _cluster_info = nullptr;
 };
 
 void VWalScannerTest::_init_desc_table() {
@@ -279,12 +280,12 @@ void VWalScannerTest::init() {
     _scan_range.params.format_type = TFileFormatType::FORMAT_WAL;
     _kv_cache.reset(new ShardedKVCache(48));
 
-    _master_info.reset(new TMasterInfo());
+    _cluster_info.reset(new ClusterInfo());
     _env = ExecEnv::GetInstance();
-    _env->_master_info = _master_info.get();
-    _env->_master_info->network_address.hostname = "host name";
-    _env->_master_info->network_address.port = _backend_id;
-    _env->_master_info->backend_id = 1001;
+    _env->_cluster_info = _cluster_info.get();
+    _env->_cluster_info->master_fe_addr.hostname = "host name";
+    _env->_cluster_info->master_fe_addr.port = _backend_id;
+    _env->_cluster_info->backend_id = 1001;
     _env->_wal_manager = WalManager::create_shared(_env, _wal_dir);
     std::string base_path;
     auto st = _env->_wal_manager->_init_wal_dirs_info();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 26c3cc1b1b4..e48b514cdd2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -571,6 +571,8 @@ public class Env {
 
     private final List<String> forceSkipJournalIds = 
Arrays.asList(Config.force_skip_journal_ids);
 
+    private TokenManager tokenManager;
+
     // if a config is relative to a daemon thread. record the relation here. 
we will proactively change interval of it.
     private final Map<String, Supplier<MasterDaemon>> configtoThreads = 
ImmutableMap
             .of("dynamic_partition_check_interval_seconds", 
this::getDynamicPartitionScheduler);
@@ -817,6 +819,7 @@ public class Env {
         this.sqlCacheManager = new NereidsSqlCacheManager();
         this.splitSourceManager = new SplitSourceManager();
         this.globalExternalTransactionInfoMgr = new 
GlobalExternalTransactionInfoMgr();
+        this.tokenManager = new TokenManager();
     }
 
     public static void destroyCheckpoint() {
@@ -1852,6 +1855,7 @@ public class Env {
     // start threads that should run on all FE
     protected void startNonMasterDaemonThreads() {
         // start load manager thread
+        tokenManager.start();
         loadManager.start();
         tabletStatMgr.start();
 
@@ -4368,6 +4372,10 @@ public class Env {
         return loadManager;
     }
 
+    public TokenManager getTokenManager() {
+        return tokenManager;
+    }
+
     public ProgressManager getProgressManager() {
         return progressManager;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TokenManager.java
similarity index 95%
rename from 
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java
rename to fe/fe-core/src/main/java/org/apache/doris/catalog/TokenManager.java
index 21e9f9b0434..c6c6911a044 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TokenManager.java
@@ -15,9 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.load.loadv2;
+package org.apache.doris.catalog;
 
-import org.apache.doris.catalog.Env;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.CustomThreadFactory;
@@ -39,6 +38,13 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+/**
+ * This class is used to manage the token for internal authentication.
+ * It will generate a new token every 12 
hours(Config.token_generate_period_hour)
+ * and keep at most 6 tokens(Config.token_queue_size).
+ * So each token will be valid for 3 days.
+ * Only Master FE can generate a new token.
+ */
 public class TokenManager {
     private static final Logger LOG = LogManager.getLogger(TokenManager.class);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 70492e5eab2..2f9efc1ed1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -575,7 +575,7 @@ public class LoadAction extends RestBaseController {
     // So this function is not widely tested under general scenario
     private boolean checkClusterToken(String token) {
         try {
-            return 
Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token);
+            return Env.getCurrentEnv().getTokenManager().checkAuthToken(token);
         } catch (UserException e) {
             throw new UnauthorizedException(e.getMessage());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 7887d8c602b..07c459d61cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -101,16 +101,13 @@ public class LoadManager implements Writable {
 
     private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     private MysqlLoadManager mysqlLoadManager;
-    private TokenManager tokenManager;
 
     public LoadManager(LoadJobScheduler loadJobScheduler) {
         this.loadJobScheduler = loadJobScheduler;
-        this.tokenManager = new TokenManager();
-        this.mysqlLoadManager = new MysqlLoadManager(tokenManager);
+        this.mysqlLoadManager = new MysqlLoadManager();
     }
 
     public void start() {
-        tokenManager.start();
         mysqlLoadManager.start();
     }
 
@@ -184,10 +181,6 @@ public class LoadManager implements Writable {
         return mysqlLoadManager;
     }
 
-    public TokenManager getTokenManager() {
-        return tokenManager;
-    }
-
     public void replayCreateLoadJob(LoadJob loadJob) {
         createLoadJob(loadJob);
         LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()).add("msg", 
"replay create load job").build());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
index d09e73b4a33..68dffbfb3e3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
@@ -76,7 +76,6 @@ public class MysqlLoadManager {
     private static final Logger LOG = 
LogManager.getLogger(MysqlLoadManager.class);
 
     private  ThreadPoolExecutor mysqlLoadPool;
-    private final TokenManager tokenManager;
 
     private static class MySqlLoadContext {
         private boolean finished;
@@ -143,8 +142,7 @@ public class MysqlLoadManager {
     private  EvictingQueue<MySqlLoadFailRecord> failedRecords;
     private ScheduledExecutorService periodScheduler;
 
-    public MysqlLoadManager(TokenManager tokenManager) {
-        this.tokenManager = tokenManager;
+    public MysqlLoadManager() {
     }
 
     public void start() {
@@ -178,7 +176,7 @@ public class MysqlLoadManager {
             VariableMgr.setVar(sessionVariable,
                     new SetVar(SessionVariable.QUERY_TIMEOUT, new 
StringLiteral(String.valueOf(newTimeOut))));
         }
-        String token = tokenManager.acquireToken();
+        String token = Env.getCurrentEnv().getTokenManager().acquireToken();
         boolean clientLocal = dataDesc.isClientLocal();
         MySqlLoadContext loadContext = new MySqlLoadContext();
         loadContextMap.put(loadId, loadContext);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
index b00f89db4b5..f4a7259bf71 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
@@ -136,7 +136,7 @@ public class CanalSyncChannel extends SyncChannel {
                                     FrontendOptions.getLocalHostAddress(),
                                     ExecuteEnv.getInstance().getStartupTime()),
                             sourceType, timeoutSecond);
-                    String token = 
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
+                    String token = 
Env.getCurrentEnv().getTokenManager().acquireToken();
                     request = new TStreamLoadPutRequest()
                         
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
                         
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
index cf9a8797161..6b301c16f73 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
@@ -198,7 +198,7 @@ public class InsertUtils {
         String label = txnEntry.getLabel();
         try {
             long txnId;
-            String token = 
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
+            String token = 
Env.getCurrentEnv().getTokenManager().acquireToken();
             if (Config.isCloudMode() || Env.getCurrentEnv().isMaster()) {
                 txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
                         txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), 
label,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
index 06789b864b6..644bddee58b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
@@ -187,7 +187,7 @@ public class AuditLoader extends Plugin implements 
AuditPlugin {
                 String token = "";
                 try {
                     // Acquire token from master
-                    token = 
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
+                    token = 
Env.getCurrentEnv().getTokenManager().acquireToken();
                 } catch (Exception e) {
                     LOG.warn("Failed to get auth token: {}", e);
                     discardLogNum += auditLogNum;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 7b557418b80..9daaf525ca4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2247,10 +2247,10 @@ public class StmtExecutor {
                             ExecuteEnv.getInstance().getStartupTime()),
                     sourceType, timeoutSecond);
             txnConf.setTxnId(txnId);
-            String token = 
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
+            String token = 
Env.getCurrentEnv().getTokenManager().acquireToken();
             txnConf.setToken(token);
         } else {
-            String token = 
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
+            String token = 
Env.getCurrentEnv().getTokenManager().acquireToken();
             MasterTxnExecutor masterTxnExecutor = new 
MasterTxnExecutor(context);
             TLoadTxnBeginRequest request = new TLoadTxnBeginRequest();
             
request.setDb(txnConf.getDb()).setTbl(txnConf.getTbl()).setToken(token)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index ba49d6da8e6..ac342672638 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1230,7 +1230,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
 
     private TLoadTxnBeginResult loadTxnBeginImpl(TLoadTxnBeginRequest request, 
String clientIp) throws UserException {
         if (request.isSetAuthCode()) {
-            // TODO(cmy): find a way to check
+            // TODO: deprecated, removed in 3.1, use token instead.
         } else if (Strings.isNullOrEmpty(request.getToken())) {
             checkSingleTablePasswordAndPrivs(request.getUser(), 
request.getPasswd(), request.getDb(),
                     request.getTbl(),
@@ -1464,7 +1464,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
 
     private void loadTxnPreCommitImpl(TLoadTxnCommitRequest request) throws 
UserException {
         if (request.isSetAuthCode()) {
-            // CHECKSTYLE IGNORE THIS LINE
+            // TODO: deprecated, removed in 3.1, use token instead.
         } else if (request.isSetToken()) {
             if (!checkToken(request.getToken())) {
                 throw new AuthenticationException("Invalid token: " + 
request.getToken());
@@ -1650,7 +1650,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
     // timeout
     private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws 
UserException {
         if (request.isSetAuthCode()) {
-            // TODO(cmy): find a way to check
+            // TODO: deprecated, removed in 3.1, use token instead.
         } else if (request.isSetToken()) {
             checkToken(request.getToken());
         } else {
@@ -1791,7 +1791,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
 
         // Step 3: check auth
         if (request.isSetAuthCode()) {
-            // TODO(cmy): find a way to check
+            // TODO: deprecated, removed in 3.1, use token instead.
         } else if (request.isSetToken()) {
             checkToken(request.getToken());
         } else {
@@ -1871,7 +1871,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
 
     private void loadTxnRollbackImpl(TLoadTxnRollbackRequest request) throws 
UserException {
         if (request.isSetAuthCode()) {
-            // TODO(cmy): find a way to check
+            // TODO: deprecated, removed in 3.1, use token instead.
         } else if (request.isSetToken()) {
             checkToken(request.getToken());
         } else {
@@ -1994,7 +1994,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
 
         // Step 3: check auth
         if (request.isSetAuthCode()) {
-            // TODO(cmy): find a way to check
+            // TODO: deprecated, removed in 3.1, use token instead.
         } else if (request.isSetToken()) {
             checkToken(request.getToken());
         } else {
@@ -2159,7 +2159,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         ConnectContext ctx = ConnectContext.get();
 
         if (request.isSetAuthCode()) {
-            // TODO(cmy): find a way to check
+            // TODO: deprecated, removed in 3.1, use token instead.
         } else if (Strings.isNullOrEmpty(request.getToken())) {
             checkSingleTablePasswordAndPrivs(request.getUser(), 
request.getPasswd(), request.getDb(),
                     request.getTbl(),
@@ -2390,7 +2390,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         TStatus status = new TStatus(TStatusCode.OK);
         result.setStatus(status);
         try {
-            String token = 
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
+            String token = 
Env.getCurrentEnv().getTokenManager().acquireToken();
             result.setToken(token);
         } catch (Throwable e) {
             LOG.warn("catch unknown result.", e);
@@ -2409,7 +2409,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             LOG.debug("receive check token request from client: {}", 
clientAddr);
         }
         try {
-            return 
Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token);
+            return Env.getCurrentEnv().getTokenManager().checkAuthToken(token);
         } catch (Throwable e) {
             LOG.warn("catch unknown result.", e);
             return false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index fb6853e83c3..d7eff484c6a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -250,6 +250,7 @@ public class HeartbeatMgr extends MasterDaemon {
                 copiedMasterInfo.setHeartbeatFlags(flags);
                 copiedMasterInfo.setBackendId(backendId);
                 copiedMasterInfo.setFrontendInfos(feInfos);
+                
copiedMasterInfo.setAuthToken(Env.getCurrentEnv().getTokenManager().acquireToken());
                 if (Config.isCloudMode()) {
                     String cloudUniqueId = 
backend.getTagMap().get(Tag.CLOUD_UNIQUE_ID);
                     copiedMasterInfo.setCloudUniqueId(cloudUniqueId);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index fcab55866ed..25c4ff4b3b2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -225,7 +225,7 @@ public class TransactionEntry {
                                 ExecuteEnv.getInstance().getStartupTime()),
                         LoadJobSourceType.INSERT_STREAMING, timeoutSecond);
             } else {
-                String token = 
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
+                String token = 
Env.getCurrentEnv().getTokenManager().acquireToken();
                 MasterTxnExecutor masterTxnExecutor = new 
MasterTxnExecutor(ConnectContext.get());
                 TLoadTxnBeginRequest request = new TLoadTxnBeginRequest();
                 
request.setDb(database.getFullName()).setTbl(table.getName()).setToken(token)
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java
index 13ae9b6e44e..2f8f1437344 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.load.loadv2;
 
+import org.apache.doris.catalog.TokenManager;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
 
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 77e49e6c1fa..7078d5eaeab 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -645,12 +645,12 @@ struct TLoadTxnBeginRequest {
     6: optional string user_ip
     7: required string label
     8: optional i64 timestamp   // deprecated, use request_id instead
-    9: optional i64 auth_code
+    9: optional i64 auth_code // deprecated, use token instead
     // The real value of timeout should be i32. i64 ensures the compatibility 
of interface.
     10: optional i64 timeout
     11: optional Types.TUniqueId request_id
     12: optional string token
-    13: optional string auth_code_uuid
+    13: optional string auth_code_uuid // deprecated, use token instead
     14: optional i64 table_id
     15: optional i64 backend_id
 }
@@ -670,7 +670,7 @@ struct TBeginTxnRequest {
     5: optional list<i64> table_ids
     6: optional string user_ip
     7: optional string label
-    8: optional i64 auth_code
+    8: optional i64 auth_code // deprecated, use token instead
     // The real value of timeout should be i32. i64 ensures the compatibility 
of interface.
     9: optional i64 timeout
     10: optional Types.TUniqueId request_id
@@ -718,7 +718,7 @@ struct TStreamLoadPutRequest {
     14: optional string columnSeparator
 
     15: optional string partitions
-    16: optional i64 auth_code
+    16: optional i64 auth_code // deprecated, use token instead
     17: optional bool negative
     18: optional i32 timeout
     19: optional bool strictMode
@@ -831,14 +831,14 @@ struct TLoadTxnCommitRequest {
     7: required i64 txnId
     8: required bool sync
     9: optional list<Types.TTabletCommitInfo> commitInfos
-    10: optional i64 auth_code
+    10: optional i64 auth_code // deprecated, use token instead
     11: optional TTxnCommitAttachment txnCommitAttachment
     12: optional i64 thrift_rpc_timeout_ms
     13: optional string token
     14: optional i64 db_id
     15: optional list<string> tbls
     16: optional i64 table_id
-    17: optional string auth_code_uuid
+    17: optional string auth_code_uuid // deprecated, use token instead
     18: optional bool groupCommit
     19: optional i64 receiveBytes
     20: optional i64 backendId 
@@ -856,7 +856,7 @@ struct TCommitTxnRequest {
     5: optional string user_ip
     6: optional i64 txn_id
     7: optional list<Types.TTabletCommitInfo> commit_infos
-    8: optional i64 auth_code
+    8: optional i64 auth_code // deprecated, use token instead
     9: optional TTxnCommitAttachment txn_commit_attachment
     10: optional i64 thrift_rpc_timeout_ms
     11: optional string token
@@ -879,13 +879,13 @@ struct TLoadTxn2PCRequest {
     5: optional string user_ip
     6: optional i64 txnId
     7: optional string operation
-    8: optional i64 auth_code
+    8: optional i64 auth_code // deprecated, use token instead
     9: optional string token
     10: optional i64 thrift_rpc_timeout_ms
     11: optional string label
 
     // For cloud
-    1000: optional string auth_code_uuid
+    1000: optional string auth_code_uuid // deprecated, use token instead
 }
 
 struct TLoadTxn2PCResult {
@@ -900,7 +900,7 @@ struct TRollbackTxnRequest {
     5: optional string user_ip
     6: optional i64 txn_id
     7: optional string reason
-    9: optional i64 auth_code
+    9: optional i64 auth_code // deprecated, use token instead
     10: optional TTxnCommitAttachment txn_commit_attachment
     11: optional string token
     12: optional i64 db_id
@@ -920,12 +920,12 @@ struct TLoadTxnRollbackRequest {
     6: optional string user_ip
     7: required i64 txnId
     8: optional string reason
-    9: optional i64 auth_code
+    9: optional i64 auth_code // deprecated, use token instead
     10: optional TTxnCommitAttachment txnCommitAttachment
     11: optional string token
     12: optional i64 db_id
     13: optional list<string> tbls
-    14: optional string auth_code_uuid
+    14: optional string auth_code_uuid // deprecated, use token instead
     15: optional string label
 }
 
diff --git a/gensrc/thrift/HeartbeatService.thrift 
b/gensrc/thrift/HeartbeatService.thrift
index acdc608f21b..47c41650b78 100644
--- a/gensrc/thrift/HeartbeatService.thrift
+++ b/gensrc/thrift/HeartbeatService.thrift
@@ -43,6 +43,7 @@ struct TMasterInfo {
     11: optional string cloud_unique_id;
     // See configuration item Config.java rehash_tablet_after_be_dead_seconds 
for meaning
     12: optional i64 tablet_report_inactive_duration_ms;
+    13: optional string auth_token;
 }
 
 struct TBackendInfo {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to