This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 478b36edb75 branch-3.0: [opt](http) enable auth token with BE http request (#43659) 478b36edb75 is described below commit 478b36edb75e669da433c2bcd9d3516ae9f3d41a Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Tue Nov 12 16:10:16 2024 +0800 branch-3.0: [opt](http) enable auth token with BE http request (#43659) Cherry-picked from #41994 Co-authored-by: Mingyu Chen (Rayner) <morning...@163.com> Co-authored-by: morningman <yun...@selectdb.com> --- be/src/agent/agent_server.cpp | 34 ++++++------ be/src/agent/agent_server.h | 8 +-- be/src/agent/heartbeat_server.cpp | 63 +++++++++++++--------- be/src/agent/heartbeat_server.h | 13 ++--- be/src/agent/task_worker_pool.cpp | 40 +++++++------- be/src/agent/task_worker_pool.h | 16 +++--- be/src/agent/utils.cpp | 49 ++++++++--------- be/src/agent/utils.h | 10 ++-- be/src/cloud/cloud_meta_mgr.cpp | 2 +- be/src/common/utils.h | 3 +- .../schema_active_queries_scanner.cpp | 2 +- .../schema_scanner/schema_partitions_scanner.cpp | 2 +- .../exec/schema_scanner/schema_routine_scanner.cpp | 4 +- .../schema_table_options_scanner.cpp | 2 +- .../schema_table_properties_scanner.cpp | 2 +- .../schema_workload_group_privileges.cpp | 4 +- .../schema_workload_groups_scanner.cpp | 4 +- .../schema_workload_sched_policy_scanner.cpp | 4 +- be/src/http/action/compaction_score_action.cpp | 2 +- be/src/http/action/http_stream.cpp | 8 +-- be/src/http/action/stream_load.cpp | 2 +- be/src/http/http_client.cpp | 4 ++ be/src/http/http_client.h | 8 +++ be/src/http/http_common.h | 2 +- be/src/http/http_handler_with_auth.cpp | 19 ++++++- be/src/http/http_headers.cpp | 3 +- be/src/http/http_headers.h | 3 +- be/src/http/utils.cpp | 11 +++- be/src/io/fs/multi_table_pipe.cpp | 2 +- be/src/olap/olap_server.cpp | 6 +-- be/src/olap/task/engine_clone_task.cpp | 6 +-- be/src/olap/task/engine_clone_task.h | 6 +-- be/src/olap/wal/wal_manager.cpp | 6 +-- be/src/olap/wal/wal_table.cpp | 14 +++-- be/src/runtime/cluster_info.h | 48 +++++++++++++++++ be/src/runtime/exec_env.cpp | 15 ++++-- be/src/runtime/exec_env.h | 12 +++-- be/src/runtime/exec_env_init.cpp | 6 +-- be/src/runtime/fragment_mgr.cpp | 4 +- be/src/runtime/group_commit_mgr.cpp | 26 +++++---- .../routine_load/routine_load_task_executor.cpp | 2 + be/src/runtime/runtime_query_statistics_mgr.cpp | 6 +-- be/src/runtime/small_file_mgr.cpp | 6 +-- be/src/runtime/snapshot_loader.cpp | 2 +- .../runtime/stream_load/stream_load_executor.cpp | 12 ++--- .../workload_group/workload_group_manager.cpp | 2 +- be/src/service/backend_service.cpp | 2 +- be/src/service/doris_main.cpp | 8 ++- be/src/service/internal_service.cpp | 2 +- be/src/vec/exec/scan/vmeta_scanner.cpp | 2 +- be/src/vec/sink/autoinc_buffer.cpp | 4 +- be/src/vec/sink/vrow_distribution.cpp | 4 +- be/test/agent/heartbeat_server_test.cpp | 12 ++--- be/test/agent/mock_utils.h | 2 +- be/test/agent/task_worker_pool_test.cpp | 7 +-- be/test/http/http_client_test.cpp | 34 +++++++++++- be/test/olap/wal/wal_manager_test.cpp | 12 ++--- .../runtime/routine_load_task_executor_test.cpp | 6 +-- be/test/vec/exec/vfile_scanner_exception_test.cpp | 13 ++--- be/test/vec/exec/vwal_scanner_test.cpp | 13 ++--- .../main/java/org/apache/doris/catalog/Env.java | 8 +++ .../{load/loadv2 => catalog}/TokenManager.java | 10 +++- .../org/apache/doris/httpv2/rest/LoadAction.java | 2 +- .../org/apache/doris/load/loadv2/LoadManager.java | 9 +--- .../apache/doris/load/loadv2/MysqlLoadManager.java | 6 +-- .../doris/load/sync/canal/CanalSyncChannel.java | 2 +- .../trees/plans/commands/insert/InsertUtils.java | 2 +- .../org/apache/doris/plugin/audit/AuditLoader.java | 2 +- .../java/org/apache/doris/qe/StmtExecutor.java | 4 +- .../apache/doris/service/FrontendServiceImpl.java | 18 +++---- .../java/org/apache/doris/system/HeartbeatMgr.java | 1 + .../apache/doris/transaction/TransactionEntry.java | 2 +- .../apache/doris/load/loadv2/TokenManagerTest.java | 1 + gensrc/thrift/FrontendService.thrift | 24 ++++----- gensrc/thrift/HeartbeatService.thrift | 1 + 75 files changed, 442 insertions(+), 276 deletions(-) diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index 361a8ab93a9..0b17f3782e7 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -49,9 +49,9 @@ using std::vector; namespace doris { -AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info) - : _master_info(master_info), _topic_subscriber(new TopicSubscriber()) { - MasterServerClient::create(master_info); +AgentServer::AgentServer(ExecEnv* exec_env, const ClusterInfo* cluster_info) + : _cluster_info(cluster_info), _topic_subscriber(new TopicSubscriber()) { + MasterServerClient::create(cluster_info); #if !defined(BE_TEST) && !defined(__APPLE__) // Add subscriber here and register listeners @@ -170,7 +170,7 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) { "ALTER_TABLE", config::alter_tablet_worker_count, [&engine](auto&& task) { return alter_tablet_callback(engine, task); }); _workers[TTaskType::CLONE] = std::make_unique<TaskWorkerPool>( - "CLONE", config::clone_worker_count, [&engine, &master_info = _master_info](auto&& task) { return clone_callback(engine, master_info, task); }); + "CLONE", config::clone_worker_count, [&engine, &cluster_info = _cluster_info](auto&& task) { return clone_callback(engine, cluster_info, task); }); _workers[TTaskType::STORAGE_MEDIUM_MIGRATE] = std::make_unique<TaskWorkerPool>( "STORAGE_MEDIUM_MIGRATE", config::storage_medium_migrate_count, [&engine](auto&& task) { return storage_medium_migrate_callback(engine, task); }); @@ -188,13 +188,13 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) { "UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return visible_version_callback(engine, task); }); _report_workers.push_back(std::make_unique<ReportWorker>( - "REPORT_TASK", _master_info, config::report_task_interval_seconds, [&master_info = _master_info] { report_task_callback(master_info); })); + "REPORT_TASK", _cluster_info, config::report_task_interval_seconds, [&cluster_info = _cluster_info] { report_task_callback(cluster_info); })); _report_workers.push_back(std::make_unique<ReportWorker>( - "REPORT_DISK_STATE", _master_info, config::report_disk_state_interval_seconds, [&engine, &master_info = _master_info] { report_disk_callback(engine, master_info); })); + "REPORT_DISK_STATE", _cluster_info, config::report_disk_state_interval_seconds, [&engine, &cluster_info = _cluster_info] { report_disk_callback(engine, cluster_info); })); _report_workers.push_back(std::make_unique<ReportWorker>( - "REPORT_OLAP_TABLET", _master_info, config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] { report_tablet_callback(engine, master_info); })); + "REPORT_OLAP_TABLET", _cluster_info, config::report_tablet_interval_seconds,[&engine, &cluster_info = _cluster_info] { report_tablet_callback(engine, cluster_info); })); // clang-format on } @@ -217,18 +217,20 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_ "DROP_TABLE", 1, [&engine](auto&& task) { return drop_tablet_callback(engine, task); }); _report_workers.push_back(std::make_unique<ReportWorker>( - "REPORT_TASK", _master_info, config::report_task_interval_seconds, - [&master_info = _master_info] { report_task_callback(master_info); })); + "REPORT_TASK", _cluster_info, config::report_task_interval_seconds, + [&cluster_info = _cluster_info] { report_task_callback(cluster_info); })); _report_workers.push_back(std::make_unique<ReportWorker>( - "REPORT_DISK_STATE", _master_info, config::report_disk_state_interval_seconds, - [&engine, &master_info = _master_info] { report_disk_callback(engine, master_info); })); + "REPORT_DISK_STATE", _cluster_info, config::report_disk_state_interval_seconds, + [&engine, &cluster_info = _cluster_info] { + report_disk_callback(engine, cluster_info); + })); if (config::enable_cloud_tablet_report) { _report_workers.push_back(std::make_unique<ReportWorker>( - "REPORT_OLAP_TABLET", _master_info, config::report_tablet_interval_seconds, - [&engine, &master_info = _master_info] { - report_tablet_callback(engine, master_info); + "REPORT_OLAP_TABLET", _cluster_info, config::report_tablet_interval_seconds, + [&engine, &cluster_info = _cluster_info] { + report_tablet_callback(engine, cluster_info); })); } } @@ -239,8 +241,8 @@ void AgentServer::submit_tasks(TAgentResult& agent_result, const std::vector<TAgentTaskRequest>& tasks) { Status ret_st; - // TODO check master_info here if it is the same with that of heartbeat rpc - if (_master_info.network_address.hostname.empty() || _master_info.network_address.port == 0) { + // TODO check cluster_info here if it is the same with that of heartbeat rpc + if (_cluster_info->master_fe_addr.hostname.empty() || _cluster_info->master_fe_addr.port == 0) { Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet"); ret_st.to_thrift(&agent_result.status); return; diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h index 5a7fbafb72e..e5b5b522ba0 100644 --- a/be/src/agent/agent_server.h +++ b/be/src/agent/agent_server.h @@ -35,7 +35,7 @@ class ExecEnv; class TAgentPublishRequest; class TAgentResult; class TAgentTaskRequest; -class TMasterInfo; +class ClusterInfo; class TSnapshotRequest; class StorageEngine; class CloudStorageEngine; @@ -43,7 +43,7 @@ class CloudStorageEngine; // Each method corresponds to one RPC from FE Master, see BackendService. class AgentServer { public: - explicit AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info); + explicit AgentServer(ExecEnv* exec_env, const ClusterInfo* cluster_info); ~AgentServer(); void start_workers(StorageEngine& engine, ExecEnv* exec_env); @@ -63,8 +63,8 @@ public: void stop_report_workers(); private: - // Reference to the ExecEnv::_master_info - const TMasterInfo& _master_info; + // Reference to the ExecEnv::_cluster_info + const ClusterInfo* _cluster_info; std::unordered_map<int64_t /* TTaskType */, std::unique_ptr<TaskWorkerPoolIf>> _workers; diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index 78002ed08fe..11345ea06f0 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -31,6 +31,7 @@ #include "common/config.h" #include "common/status.h" #include "olap/storage_engine.h" +#include "runtime/cluster_info.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "runtime/heartbeat_flags.h" @@ -49,15 +50,15 @@ class TProcessor; namespace doris { -HeartbeatServer::HeartbeatServer(TMasterInfo* master_info) +HeartbeatServer::HeartbeatServer(ClusterInfo* cluster_info) : _engine(ExecEnv::GetInstance()->storage_engine()), - _master_info(master_info), + _cluster_info(cluster_info), _fe_epoch(0) { _be_epoch = GetCurrentTimeMicros() / 1000; } void HeartbeatServer::init_cluster_id() { - _master_info->cluster_id = _engine.effective_cluster_id(); + _cluster_info->cluster_id = _engine.effective_cluster_id(); } void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result, @@ -65,7 +66,7 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result, //print heartbeat in every minute LOG_EVERY_N(INFO, 12) << "get heartbeat from FE." << "host:" << master_info.network_address.hostname - << ", port:" << master_info.network_address.port + << ", rpc port:" << master_info.network_address.port << ", cluster id:" << master_info.cluster_id << ", frontend_info:" << PrintFrontendInfos(master_info.frontend_infos) << ", counter:" << google::COUNTER << ", BE start time: " << _be_epoch; @@ -108,22 +109,23 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { std::lock_guard<std::mutex> lk(_hb_mtx); // Check cluster id - if (_master_info->cluster_id == -1) { + if (_cluster_info->cluster_id == -1) { LOG(INFO) << "get first heartbeat. update cluster id"; // write and update cluster id RETURN_IF_ERROR(_engine.set_cluster_id(master_info.cluster_id)); - _master_info->cluster_id = master_info.cluster_id; + _cluster_info->cluster_id = master_info.cluster_id; LOG(INFO) << "record cluster id. host: " << master_info.network_address.hostname << ". port: " << master_info.network_address.port << ". cluster id: " << master_info.cluster_id << ". frontend_infos: " << PrintFrontendInfos(master_info.frontend_infos); } else { - if (_master_info->cluster_id != master_info.cluster_id) { + if (_cluster_info->cluster_id != master_info.cluster_id) { return Status::InternalError( "invalid cluster id. ignore. Record cluster id ={}, record frontend info {}. " "Invalid cluster_id={}, invalid frontend info {}", - _master_info->cluster_id, PrintFrontendInfos(_master_info->frontend_infos), + _cluster_info->cluster_id, + PrintFrontendInfos(ExecEnv::GetInstance()->get_frontends()), master_info.cluster_id, PrintFrontendInfos(master_info.frontend_infos)); } } @@ -183,22 +185,22 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { } bool need_report = false; - if (_master_info->network_address.hostname != master_info.network_address.hostname || - _master_info->network_address.port != master_info.network_address.port) { + if (_cluster_info->master_fe_addr.hostname != master_info.network_address.hostname || + _cluster_info->master_fe_addr.port != master_info.network_address.port) { if (master_info.epoch > _fe_epoch) { - _master_info->network_address.hostname = master_info.network_address.hostname; - _master_info->network_address.port = master_info.network_address.port; + _cluster_info->master_fe_addr.hostname = master_info.network_address.hostname; + _cluster_info->master_fe_addr.port = master_info.network_address.port; _fe_epoch = master_info.epoch; need_report = true; LOG(INFO) << "master change. new master host: " - << _master_info->network_address.hostname - << ". port: " << _master_info->network_address.port + << _cluster_info->master_fe_addr.hostname + << ". port: " << _cluster_info->master_fe_addr.port << ". epoch: " << _fe_epoch; } else { return Status::InternalError( "epoch is not greater than local. ignore heartbeat. host: {}, port: {}, local " "epoch: {}, received epoch: {}", - _master_info->network_address.hostname, _master_info->network_address.port, + _cluster_info->master_fe_addr.hostname, _cluster_info->master_fe_addr.port, _fe_epoch, master_info.epoch); } } else { @@ -211,16 +213,17 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { } if (master_info.__isset.token) { - if (!_master_info->__isset.token) { - _master_info->__set_token(master_info.token); - LOG(INFO) << "get token. token: " << _master_info->token; - } else if (_master_info->token != master_info.token) { - return Status::InternalError("invalid token"); + if (_cluster_info->token == "") { + _cluster_info->token = master_info.token; + LOG(INFO) << "get token. token: " << _cluster_info->token; + } else if (_cluster_info->token != master_info.token) { + return Status::InternalError("invalid token. local: {}, master: {}", + _cluster_info->token, master_info.token); } } if (master_info.__isset.http_port) { - _master_info->__set_http_port(master_info.http_port); + _cluster_info->master_fe_http_port = master_info.http_port; } if (master_info.__isset.heartbeat_flags) { @@ -229,7 +232,7 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { } if (master_info.__isset.backend_id) { - _master_info->__set_backend_id(master_info.backend_id); + _cluster_info->backend_id = master_info.backend_id; BackendOptions::set_backend_id(master_info.backend_id); } if (master_info.__isset.frontend_infos) { @@ -281,6 +284,18 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { master_info.tablet_report_inactive_duration_ms; } + if (master_info.__isset.auth_token) { + if (_cluster_info->curr_auth_token == "") { + _cluster_info->curr_auth_token = master_info.auth_token; + LOG(INFO) << "set new auth token: " << master_info.auth_token; + } else if (_cluster_info->curr_auth_token != master_info.auth_token) { + LOG(INFO) << "last auth token: " << _cluster_info->last_auth_token + << "set new auth token: " << master_info.auth_token; + _cluster_info->last_auth_token = _cluster_info->curr_auth_token; + _cluster_info->curr_auth_token = master_info.auth_token; + } + } + if (need_report) { LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately"; _engine.notify_listeners(); @@ -291,8 +306,8 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { Status create_heartbeat_server(ExecEnv* exec_env, uint32_t server_port, std::unique_ptr<ThriftServer>* thrift_server, - uint32_t worker_thread_num, TMasterInfo* local_master_info) { - HeartbeatServer* heartbeat_server = new HeartbeatServer(local_master_info); + uint32_t worker_thread_num, ClusterInfo* cluster_info) { + HeartbeatServer* heartbeat_server = new HeartbeatServer(cluster_info); if (heartbeat_server == nullptr) { return Status::InternalError("Get heartbeat server failed"); } diff --git a/be/src/agent/heartbeat_server.h b/be/src/agent/heartbeat_server.h index 29fecd07881..67d67fd486d 100644 --- a/be/src/agent/heartbeat_server.h +++ b/be/src/agent/heartbeat_server.h @@ -26,6 +26,7 @@ #include "common/status.h" namespace doris { +class ClusterInfo; class ExecEnv; class THeartbeatResult; class TMasterInfo; @@ -36,7 +37,7 @@ class ThriftServer; class HeartbeatServer : public HeartbeatServiceIf { public: - explicit HeartbeatServer(TMasterInfo* master_info); + explicit HeartbeatServer(ClusterInfo* cluster_info); ~HeartbeatServer() override = default; void init_cluster_id(); @@ -44,7 +45,7 @@ public: // Master send heartbeat to this server // // Input parameters: - // * master_info: The struct of master info, contains host ip and port + // * master_info: The struct of master info, contains cluster info from Master FE // // Output parameters: // * heartbeat_result: The result of heartbeat set @@ -56,10 +57,10 @@ private: BaseStorageEngine& _engine; int64_t _be_epoch; - // mutex to protect master_info and _epoch + // mutex to protect cluster_info and _epoch std::mutex _hb_mtx; - // Not owned. Point to the ExecEnv::_master_info - TMasterInfo* _master_info = nullptr; + // Not owned. Point to the ExecEnv::_cluster_info + ClusterInfo* _cluster_info = nullptr; int64_t _fe_epoch; DISALLOW_COPY_AND_ASSIGN(HeartbeatServer); @@ -67,5 +68,5 @@ private: Status create_heartbeat_server(ExecEnv* exec_env, uint32_t heartbeat_server_port, std::unique_ptr<ThriftServer>* heart_beat_server, - uint32_t worker_thread_num, TMasterInfo* local_master_info); + uint32_t worker_thread_num, ClusterInfo* cluster_info); } // namespace doris diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 4614304e439..01b107b3ea7 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -378,22 +378,22 @@ Status check_migrate_request(StorageEngine& engine, const TStorageMediumMigrateR } // Return `true` if report success -bool handle_report(const TReportRequest& request, const TMasterInfo& master_info, +bool handle_report(const TReportRequest& request, const ClusterInfo* cluster_info, std::string_view name) { TMasterResult result; Status status = MasterServerClient::instance()->report(request, &result); if (!status.ok()) [[unlikely]] { LOG_WARNING("failed to report {}", name) - .tag("host", master_info.network_address.hostname) - .tag("port", master_info.network_address.port) + .tag("host", cluster_info->master_fe_addr.hostname) + .tag("port", cluster_info->master_fe_addr.port) .error(status); return false; } else if (result.status.status_code != TStatusCode::OK) [[unlikely]] { LOG_WARNING("failed to report {}", name) - .tag("host", master_info.network_address.hostname) - .tag("port", master_info.network_address.port) + .tag("host", cluster_info->master_fe_addr.hostname) + .tag("port", cluster_info->master_fe_addr.port) .error(result.status); return false; } @@ -663,10 +663,10 @@ void PriorTaskWorkerPool::high_prior_loop() { } } -ReportWorker::ReportWorker(std::string name, const TMasterInfo& master_info, int report_interval_s, +ReportWorker::ReportWorker(std::string name, const ClusterInfo* cluster_info, int report_interval_s, std::function<void()> callback) : _name(std::move(name)) { - auto report_loop = [this, &master_info, report_interval_s, callback = std::move(callback)] { + auto report_loop = [this, cluster_info, report_interval_s, callback = std::move(callback)] { auto& engine = ExecEnv::GetInstance()->storage_engine(); engine.register_report_listener(this); while (true) { @@ -685,7 +685,7 @@ ReportWorker::ReportWorker(std::string name, const TMasterInfo& master_info, int } } - if (master_info.network_address.port == 0) { + if (cluster_info->master_fe_addr.port == 0) { // port == 0 means not received heartbeat yet LOG(INFO) << "waiting to receive first heartbeat from frontend before doing report"; continue; @@ -990,7 +990,7 @@ void check_consistency_callback(StorageEngine& engine, const TAgentTaskRequest& remove_task_info(req.task_type, req.signature); } -void report_task_callback(const TMasterInfo& master_info) { +void report_task_callback(const ClusterInfo* cluster_info) { TReportRequest request; if (config::report_random_wait) { random_sleep(5); @@ -1007,14 +1007,14 @@ void report_task_callback(const TMasterInfo& master_info) { } } request.__set_backend(BackendOptions::get_local_backend()); - bool succ = handle_report(request, master_info, "task"); + bool succ = handle_report(request, cluster_info, "task"); report_task_total << 1; if (!succ) [[unlikely]] { report_task_failed << 1; } } -void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info) { +void report_disk_callback(StorageEngine& engine, const ClusterInfo* cluster_info) { TReportRequest request; request.__set_backend(BackendOptions::get_local_backend()); request.__isset.disks = true; @@ -1039,14 +1039,14 @@ void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info) request.__set_pipeline_executor_size(config::pipeline_executor_size > 0 ? config::pipeline_executor_size : CpuInfo::num_cores()); - bool succ = handle_report(request, master_info, "disk"); + bool succ = handle_report(request, cluster_info, "disk"); report_disk_total << 1; if (!succ) [[unlikely]] { report_disk_failed << 1; } } -void report_disk_callback(CloudStorageEngine& engine, const TMasterInfo& master_info) { +void report_disk_callback(CloudStorageEngine& engine, const ClusterInfo* cluster_info) { // Random sleep 1~5 seconds before doing report. // In order to avoid the problem that the FE receives many report requests at the same time // and can not be processed. @@ -1066,12 +1066,12 @@ void report_disk_callback(CloudStorageEngine& engine, const TMasterInfo& master_ request.__set_pipeline_executor_size(config::pipeline_executor_size > 0 ? config::pipeline_executor_size : CpuInfo::num_cores()); - bool succ = handle_report(request, master_info, "disk"); + bool succ = handle_report(request, cluster_info, "disk"); report_disk_total << 1; report_disk_failed << !succ; } -void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_info) { +void report_tablet_callback(StorageEngine& engine, const ClusterInfo* cluster_info) { if (config::report_random_wait) { random_sleep(5); } @@ -1133,14 +1133,14 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf } request.__isset.resource = true; - bool succ = handle_report(request, master_info, "tablet"); + bool succ = handle_report(request, cluster_info, "tablet"); report_tablet_total << 1; if (!succ) [[unlikely]] { report_tablet_failed << 1; } } -void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo& master_info) { +void report_tablet_callback(CloudStorageEngine& engine, const ClusterInfo* cluster_info) { // Random sleep 1~5 seconds before doing report. // In order to avoid the problem that the FE receives many report requests at the same time // and can not be processed. @@ -1173,7 +1173,7 @@ void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo& maste request.__set_report_version(report_version); request.__set_num_tablets(total_num_tablets); - bool succ = handle_report(request, master_info, "tablet"); + bool succ = handle_report(request, cluster_info, "tablet"); report_tablet_total << 1; if (!succ) [[unlikely]] { report_tablet_failed << 1; @@ -2013,7 +2013,7 @@ void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& re visible_version_req.partition_version); } -void clone_callback(StorageEngine& engine, const TMasterInfo& master_info, +void clone_callback(StorageEngine& engine, const ClusterInfo* cluster_info, const TAgentTaskRequest& req) { const auto& clone_req = req.clone_req; @@ -2021,7 +2021,7 @@ void clone_callback(StorageEngine& engine, const TMasterInfo& master_info, LOG(INFO) << "get clone task. signature=" << req.signature; std::vector<TTabletInfo> tablet_infos; - EngineCloneTask engine_task(engine, clone_req, master_info, req.signature, &tablet_infos); + EngineCloneTask engine_task(engine, clone_req, cluster_info, req.signature, &tablet_infos); SCOPED_ATTACH_TASK(engine_task.mem_tracker()); auto status = engine_task.execute(); // Return result to fe diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index c50ac57ffe9..f6223affd07 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -36,10 +36,10 @@ class StorageEngine; class CloudStorageEngine; class Thread; class ThreadPool; -class TMasterInfo; class TReportRequest; class TTabletInfo; class TAgentTaskRequest; +class ClusterInfo; class TaskWorkerPoolIf { public: @@ -109,7 +109,7 @@ private: class ReportWorker { public: - ReportWorker(std::string name, const TMasterInfo& master_info, int report_interval_s, + ReportWorker(std::string name, const ClusterInfo* cluster_info, int report_interval_s, std::function<void()> callback); ~ReportWorker(); @@ -169,7 +169,7 @@ void alter_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req); void alter_cloud_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req); -void clone_callback(StorageEngine& engine, const TMasterInfo& master_info, +void clone_callback(StorageEngine& engine, const ClusterInfo* cluster_info, const TAgentTaskRequest& req); void storage_medium_migrate_callback(StorageEngine& engine, const TAgentTaskRequest& req); @@ -182,15 +182,15 @@ void clean_udf_cache_callback(const TAgentTaskRequest& req); void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req); -void report_task_callback(const TMasterInfo& master_info); +void report_task_callback(const ClusterInfo* cluster_info); -void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info); +void report_disk_callback(StorageEngine& engine, const ClusterInfo* cluster_info); -void report_disk_callback(CloudStorageEngine& engine, const TMasterInfo& master_info); +void report_disk_callback(CloudStorageEngine& engine, const ClusterInfo* cluster_info); -void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_info); +void report_tablet_callback(StorageEngine& engine, const ClusterInfo* cluster_info); -void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo& master_info); +void report_tablet_callback(CloudStorageEngine& engine, const ClusterInfo* cluster_info); void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req); diff --git a/be/src/agent/utils.cpp b/be/src/agent/utils.cpp index 71e7aedbc35..2a1f9994b01 100644 --- a/be/src/agent/utils.cpp +++ b/be/src/agent/utils.cpp @@ -43,6 +43,7 @@ #include "common/config.h" #include "common/status.h" #include "runtime/client_cache.h" +#include "runtime/cluster_info.h" namespace doris { class TConfirmUnusedRemoteFilesRequest; @@ -61,8 +62,8 @@ namespace doris { static FrontendServiceClientCache s_client_cache; static std::unique_ptr<MasterServerClient> s_client; -MasterServerClient* MasterServerClient::create(const TMasterInfo& master_info) { - s_client.reset(new MasterServerClient(master_info)); +MasterServerClient* MasterServerClient::create(const ClusterInfo* cluster_info) { + s_client.reset(new MasterServerClient(cluster_info)); return s_client.get(); } @@ -70,18 +71,18 @@ MasterServerClient* MasterServerClient::instance() { return s_client.get(); } -MasterServerClient::MasterServerClient(const TMasterInfo& master_info) - : _master_info(master_info) {} +MasterServerClient::MasterServerClient(const ClusterInfo* cluster_info) + : _cluster_info(cluster_info) {} Status MasterServerClient::finish_task(const TFinishTaskRequest& request, TMasterResult* result) { Status client_status; - FrontendServiceConnection client(&s_client_cache, _master_info.network_address, + FrontendServiceConnection client(&s_client_cache, _cluster_info->master_fe_addr, config::thrift_rpc_timeout_ms, &client_status); if (!client_status.ok()) { LOG(WARNING) << "fail to get master client from cache. " - << "host=" << _master_info.network_address.hostname - << ", port=" << _master_info.network_address.port + << "host=" << _cluster_info->master_fe_addr.hostname + << ", port=" << _cluster_info->master_fe_addr.port << ", code=" << client_status.code(); return Status::InternalError("Failed to get master client"); } @@ -97,8 +98,8 @@ Status MasterServerClient::finish_task(const TFinishTaskRequest& request, TMaste if (!client_status.ok()) { #ifdef ADDRESS_SANITIZER LOG(WARNING) << "fail to get master client from cache. " - << "host=" << _master_info.network_address.hostname - << ", port=" << _master_info.network_address.port + << "host=" << _cluster_info->master_fe_addr.hostname + << ", port=" << _cluster_info->master_fe_addr.port << ", code=" << client_status.code(); #endif return Status::RpcError("Master client finish task failed"); @@ -108,8 +109,8 @@ Status MasterServerClient::finish_task(const TFinishTaskRequest& request, TMaste } catch (std::exception& e) { RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms)); LOG(WARNING) << "fail to finish_task. " - << "host=" << _master_info.network_address.hostname - << ", port=" << _master_info.network_address.port << ", error=" << e.what(); + << "host=" << _cluster_info->master_fe_addr.hostname + << ", port=" << _cluster_info->master_fe_addr.port << ", error=" << e.what(); return Status::InternalError("Fail to finish task"); } @@ -118,13 +119,13 @@ Status MasterServerClient::finish_task(const TFinishTaskRequest& request, TMaste Status MasterServerClient::report(const TReportRequest& request, TMasterResult* result) { Status client_status; - FrontendServiceConnection client(&s_client_cache, _master_info.network_address, + FrontendServiceConnection client(&s_client_cache, _cluster_info->master_fe_addr, config::thrift_rpc_timeout_ms, &client_status); if (!client_status.ok()) { LOG(WARNING) << "fail to get master client from cache. " - << "host=" << _master_info.network_address.hostname - << ", port=" << _master_info.network_address.port + << "host=" << _cluster_info->master_fe_addr.hostname + << ", port=" << _cluster_info->master_fe_addr.port << ", code=" << client_status; return Status::InternalError("Fail to get master client from cache"); } @@ -144,8 +145,8 @@ Status MasterServerClient::report(const TReportRequest& request, TMasterResult* if (!client_status.ok()) { #ifdef ADDRESS_SANITIZER LOG(WARNING) << "fail to get master client from cache. " - << "host=" << _master_info.network_address.hostname - << ", port=" << _master_info.network_address.port + << "host=" << _cluster_info->master_fe_addr.hostname + << ", port=" << _cluster_info->master_fe_addr.port << ", code=" << client_status.code(); #endif return Status::InternalError("Fail to get master client from cache"); @@ -164,8 +165,8 @@ Status MasterServerClient::report(const TReportRequest& request, TMasterResult* } catch (std::exception& e) { RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms)); LOG(WARNING) << "fail to report to master. " - << "host=" << _master_info.network_address.hostname - << ", port=" << _master_info.network_address.port + << "host=" << _cluster_info->master_fe_addr.hostname + << ", port=" << _cluster_info->master_fe_addr.port << ", code=" << client_status.code() << ", reason=" << e.what(); return Status::InternalError("Fail to report to master"); } @@ -176,13 +177,13 @@ Status MasterServerClient::report(const TReportRequest& request, TMasterResult* Status MasterServerClient::confirm_unused_remote_files( const TConfirmUnusedRemoteFilesRequest& request, TConfirmUnusedRemoteFilesResult* result) { Status client_status; - FrontendServiceConnection client(&s_client_cache, _master_info.network_address, + FrontendServiceConnection client(&s_client_cache, _cluster_info->master_fe_addr, config::thrift_rpc_timeout_ms, &client_status); if (!client_status.ok()) { return Status::InternalError( "fail to get master client from cache. host={}, port={}, code={}", - _master_info.network_address.hostname, _master_info.network_address.port, + _cluster_info->master_fe_addr.hostname, _cluster_info->master_fe_addr.port, client_status.code()); } try { @@ -198,8 +199,8 @@ Status MasterServerClient::confirm_unused_remote_files( if (!client_status.ok()) { return Status::InternalError( "fail to get master client from cache. host={}, port={}, code={}", - _master_info.network_address.hostname, - _master_info.network_address.port, client_status.code()); + _cluster_info->master_fe_addr.hostname, + _cluster_info->master_fe_addr.port, client_status.code()); } client->confirmUnusedRemoteFiles(*result, request); @@ -208,7 +209,7 @@ Status MasterServerClient::confirm_unused_remote_files( // actually we don't care what FE returns. return Status::InternalError( "fail to confirm unused remote files. host={}, port={}, code={}, reason={}", - _master_info.network_address.hostname, _master_info.network_address.port, + _cluster_info->master_fe_addr.hostname, _cluster_info->master_fe_addr.port, client_status.code(), e.what()); } } @@ -216,7 +217,7 @@ Status MasterServerClient::confirm_unused_remote_files( RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms)); return Status::InternalError( "fail to confirm unused remote files. host={}, port={}, code={}, reason={}", - _master_info.network_address.hostname, _master_info.network_address.port, + _cluster_info->master_fe_addr.hostname, _cluster_info->master_fe_addr.port, client_status.code(), e.what()); } diff --git a/be/src/agent/utils.h b/be/src/agent/utils.h index eea8f9f8cff..d554e92ff67 100644 --- a/be/src/agent/utils.h +++ b/be/src/agent/utils.h @@ -28,13 +28,13 @@ namespace doris { class TConfirmUnusedRemoteFilesRequest; class TConfirmUnusedRemoteFilesResult; class TFinishTaskRequest; -class TMasterInfo; class TMasterResult; class TReportRequest; +class ClusterInfo; class MasterServerClient { public: - static MasterServerClient* create(const TMasterInfo& master_info); + static MasterServerClient* create(const ClusterInfo* cluster_info); static MasterServerClient* instance(); ~MasterServerClient() = default; @@ -61,12 +61,12 @@ public: TConfirmUnusedRemoteFilesResult* result); private: - MasterServerClient(const TMasterInfo& master_info); + MasterServerClient(const ClusterInfo* cluster_info); DISALLOW_COPY_AND_ASSIGN(MasterServerClient); - // Not owner. Reference to the ExecEnv::_master_info - const TMasterInfo& _master_info; + // Not owner. Reference to the ExecEnv::_cluster_info + const ClusterInfo* _cluster_info; }; class AgentUtils { diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 026610d5b0c..fefdb65e44b 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -837,7 +837,7 @@ static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id, Status status; int64_t duration_ns = 0; TNetworkAddress master_addr = - ExecEnv::GetInstance()->master_info()->network_address; + ExecEnv::GetInstance()->cluster_info()->master_fe_addr; if (master_addr.hostname.empty() || master_addr.port == 0) { status = Status::Error<SERVICE_UNAVAILABLE>( "Have not get FE Master heartbeat yet"); diff --git a/be/src/common/utils.h b/be/src/common/utils.h index 46df44a40f2..92c5974e4a7 100644 --- a/be/src/common/utils.h +++ b/be/src/common/utils.h @@ -31,13 +31,14 @@ struct AuthInfo { std::string cluster; std::string user_ip; // -1 as unset - int64_t auth_code = -1; + int64_t auth_code = -1; // deprecated std::string token; }; template <class T> void set_request_auth(T* req, const AuthInfo& auth) { req->user = auth.user; // always set user, because it may be used by FE + // auth code is deprecated and should be removed in 3.1 if (auth.auth_code != -1) { // if auth_code is set, no need to set other info req->__set_auth_code(auth.auth_code); diff --git a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp index 6f66d9a8fd5..98051638026 100644 --- a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp @@ -51,7 +51,7 @@ Status SchemaActiveQueriesScanner::start(RuntimeState* state) { } Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() { - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; TSchemaTableRequestParams schema_table_params; for (int i = 0; i < _s_tbls_columns.size(); i++) { diff --git a/be/src/exec/schema_scanner/schema_partitions_scanner.cpp b/be/src/exec/schema_scanner/schema_partitions_scanner.cpp index ebe2bd3b70e..459715fd628 100644 --- a/be/src/exec/schema_scanner/schema_partitions_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_partitions_scanner.cpp @@ -98,7 +98,7 @@ Status SchemaPartitionsScanner::start(RuntimeState* state) { } Status SchemaPartitionsScanner::get_onedb_info_from_fe(int64_t dbId) { - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; TSchemaTableRequestParams schema_table_request_params; for (int i = 0; i < _s_tbls_columns.size(); i++) { diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.cpp b/be/src/exec/schema_scanner/schema_routine_scanner.cpp index e8d95f0abd6..8660d75e8a1 100644 --- a/be/src/exec/schema_scanner/schema_routine_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_routine_scanner.cpp @@ -62,7 +62,7 @@ Status SchemaRoutinesScanner::start(RuntimeState* state) { } Status SchemaRoutinesScanner::get_block_from_fe() { - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; TSchemaTableRequestParams schema_table_request_params; for (int i = 0; i < _s_tbls_columns.size(); i++) { schema_table_request_params.__isset.columns_name = true; @@ -138,4 +138,4 @@ Status SchemaRoutinesScanner::get_next_block_internal(vectorized::Block* block, return Status::OK(); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/exec/schema_scanner/schema_table_options_scanner.cpp b/be/src/exec/schema_scanner/schema_table_options_scanner.cpp index f4b636be68f..bb778996a83 100644 --- a/be/src/exec/schema_scanner/schema_table_options_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_table_options_scanner.cpp @@ -70,7 +70,7 @@ Status SchemaTableOptionsScanner::start(RuntimeState* state) { } Status SchemaTableOptionsScanner::get_onedb_info_from_fe(int64_t dbId) { - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; TSchemaTableRequestParams schema_table_request_params; for (int i = 0; i < _s_tbls_columns.size(); i++) { diff --git a/be/src/exec/schema_scanner/schema_table_properties_scanner.cpp b/be/src/exec/schema_scanner/schema_table_properties_scanner.cpp index 749113da1b5..8d6a26a552f 100644 --- a/be/src/exec/schema_scanner/schema_table_properties_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_table_properties_scanner.cpp @@ -67,7 +67,7 @@ Status SchemaTablePropertiesScanner::start(RuntimeState* state) { } Status SchemaTablePropertiesScanner::get_onedb_info_from_fe(int64_t dbId) { - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; TSchemaTableRequestParams schema_table_request_params; for (int i = 0; i < _s_tbls_columns.size(); i++) { diff --git a/be/src/exec/schema_scanner/schema_workload_group_privileges.cpp b/be/src/exec/schema_scanner/schema_workload_group_privileges.cpp index a1d4568d905..a91a28322ec 100644 --- a/be/src/exec/schema_scanner/schema_workload_group_privileges.cpp +++ b/be/src/exec/schema_scanner/schema_workload_group_privileges.cpp @@ -45,7 +45,7 @@ Status SchemaWorkloadGroupPrivilegesScanner::start(RuntimeState* state) { } Status SchemaWorkloadGroupPrivilegesScanner::_get_workload_group_privs_block_from_fe() { - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; TSchemaTableRequestParams schema_table_request_params; for (int i = 0; i < _s_tbls_columns.size(); i++) { @@ -134,4 +134,4 @@ Status SchemaWorkloadGroupPrivilegesScanner::get_next_block_internal(vectorized: return Status::OK(); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp index dd81a3ecb26..43562a8f52c 100644 --- a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp @@ -58,7 +58,7 @@ Status SchemaWorkloadGroupsScanner::start(RuntimeState* state) { } Status SchemaWorkloadGroupsScanner::_get_workload_groups_block_from_fe() { - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; TSchemaTableRequestParams schema_table_request_params; for (int i = 0; i < _s_tbls_columns.size(); i++) { @@ -144,4 +144,4 @@ Status SchemaWorkloadGroupsScanner::get_next_block_internal(vectorized::Block* b return Status::OK(); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp index 2d91f151f5f..5c6a6f70a88 100644 --- a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp @@ -49,7 +49,7 @@ Status SchemaWorkloadSchedulePolicyScanner::start(RuntimeState* state) { } Status SchemaWorkloadSchedulePolicyScanner::_get_workload_schedule_policy_block_from_fe() { - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; TSchemaTableRequestParams schema_table_request_params; for (int i = 0; i < _s_tbls_columns.size(); i++) { @@ -135,4 +135,4 @@ Status SchemaWorkloadSchedulePolicyScanner::get_next_block_internal(vectorized:: return Status::OK(); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/http/action/compaction_score_action.cpp b/be/src/http/action/compaction_score_action.cpp index 10b8cc6bdba..eeb0d0a642c 100644 --- a/be/src/http/action/compaction_score_action.cpp +++ b/be/src/http/action/compaction_score_action.cpp @@ -166,7 +166,7 @@ CompactionScoreAction::CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier:: _accessor(std::make_unique<CloudCompactionScoresAccessor>(tablet_mgr)) {} void CompactionScoreAction::handle(HttpRequest* req) { - req->add_output_header(HttpHeaders::CONTENT_TYPE, HttpHeaders::JsonType.data()); + req->add_output_header(HttpHeaders::CONTENT_TYPE, HttpHeaders::JSON_TYPE.data()); auto top_n_param = req->param(TOP_N); size_t top_n = DEFAULT_TOP_N; diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 4a34605aa33..99205368616 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -325,14 +325,14 @@ Status HttpStreamAction::process_put(HttpRequest* http_req, request.__set_group_commit_mode("sync_mode"); } } - if (_exec_env->master_info()->__isset.backend_id) { - request.__set_backend_id(_exec_env->master_info()->backend_id); + if (_exec_env->cluster_info()->backend_id != 0) { + request.__set_backend_id(_exec_env->cluster_info()->backend_id); } else { - LOG(WARNING) << "_exec_env->master_info not set backend_id"; + LOG(WARNING) << "_exec_env->cluster_info not set backend_id"; } // plan this load - TNetworkAddress master_addr = _exec_env->master_info()->network_address; + TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; int64_t stream_load_put_start_time = MonotonicNanos(); RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( master_addr.hostname, master_addr.port, diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index fa23e5e56c1..e3ad7f44866 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -666,7 +666,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, #ifndef BE_TEST // plan this load - TNetworkAddress master_addr = _exec_env->master_info()->network_address; + TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; int64_t stream_load_put_start_time = MonotonicNanos(); RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( master_addr.hostname, master_addr.port, diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp index bf1cd751ae3..c842a4fe2dd 100644 --- a/be/src/http/http_client.cpp +++ b/be/src/http/http_client.cpp @@ -26,6 +26,7 @@ #include "common/config.h" #include "http/http_headers.h" #include "http/http_status.h" +#include "runtime/exec_env.h" #include "util/stack_util.h" namespace doris { @@ -141,6 +142,9 @@ Status HttpClient::init(const std::string& url, bool set_fail_on_error) { return Status::InternalError("fail to set CURLOPT_URL"); } +#ifndef BE_TEST + set_auth_token(ExecEnv::GetInstance()->cluster_info()->curr_auth_token); +#endif return Status::OK(); } diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h index f6a1a17ec29..fb692c50268 100644 --- a/be/src/http/http_client.h +++ b/be/src/http/http_client.h @@ -26,6 +26,7 @@ #include <string> #include "common/status.h" +#include "http/http_headers.h" #include "http/http_method.h" namespace doris { @@ -55,6 +56,13 @@ public: curl_easy_setopt(_curl, CURLOPT_PASSWORD, passwd.c_str()); } + // Auth-Token: xxxx + void set_auth_token(const std::string& token) { + std::string scratch_str = HttpHeaders::AUTH_TOKEN + ": " + token; + _header_list = curl_slist_append(_header_list, scratch_str.c_str()); + curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _header_list); + } + // content_type such as "application/json" void set_content_type(const std::string content_type) { std::string scratch_str = "Content-Type: " + content_type; diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index ec2dfc896e4..b9b5f0d85ae 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -66,7 +66,7 @@ static const std::string HTTP_TXN_OPERATION_KEY = "txn_operation"; static const std::string HTTP_MEMTABLE_ON_SINKNODE = "memtable_on_sink_node"; static const std::string HTTP_LOAD_STREAM_PER_NODE = "load_stream_per_node"; static const std::string HTTP_WAL_ID_KY = "wal_id"; -static const std::string HTTP_AUTH_CODE = "auth_code"; +static const std::string HTTP_AUTH_CODE = "auth_code"; // deprecated static const std::string HTTP_GROUP_COMMIT = "group_commit"; static const std::string HTTP_CLOUD_CLUSTER = "cloud_cluster"; diff --git a/be/src/http/http_handler_with_auth.cpp b/be/src/http/http_handler_with_auth.cpp index 166638ab318..518b9868de1 100644 --- a/be/src/http/http_handler_with_auth.cpp +++ b/be/src/http/http_handler_with_auth.cpp @@ -52,6 +52,23 @@ int HttpHandlerWithAuth::on_header(HttpRequest* req) { return -1; } + // check auth by token + if (auth_info.token != "") { +#ifdef BE_TEST + if (auth_info.token == "valid_token") { + return 0; +#else + if (_exec_env->check_auth_token(auth_info.token)) { + return 0; +#endif + } else { + LOG(WARNING) << "invalid auth token, request: " << req->debug_string(); + HttpChannel::send_error(req, HttpStatus::BAD_REQUEST); + return -1; + } + } + + // check auth by user/password auth_request.user = auth_info.user; auth_request.passwd = auth_info.passwd; auth_request.__set_cluster(auth_info.cluster); @@ -65,7 +82,7 @@ int HttpHandlerWithAuth::on_header(HttpRequest* req) { } #ifndef BE_TEST - TNetworkAddress master_addr = _exec_env->master_info()->network_address; + TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; { auto status = ThriftRpcHelper::rpc<FrontendServiceClient>( master_addr.hostname, master_addr.port, diff --git a/be/src/http/http_headers.cpp b/be/src/http/http_headers.cpp index 79d9240c163..4fc5338145c 100644 --- a/be/src/http/http_headers.cpp +++ b/be/src/http/http_headers.cpp @@ -93,6 +93,7 @@ const char* HttpHeaders::WEBSOCKET_ORIGIN = "WebSocket-Origin"; const char* HttpHeaders::WEBSOCKET_PROTOCOL = "WebSocket-Protocol"; const char* HttpHeaders::WWW_AUTHENTICATE = "WWW-Authenticate"; -const std::string HttpHeaders::JsonType = "application/json"; +const std::string HttpHeaders::JSON_TYPE = "application/json"; +const std::string HttpHeaders::AUTH_TOKEN = "Auth-Token"; } // namespace doris diff --git a/be/src/http/http_headers.h b/be/src/http/http_headers.h index e2f9547ac9d..0fdfc1be22c 100644 --- a/be/src/http/http_headers.h +++ b/be/src/http/http_headers.h @@ -97,7 +97,8 @@ public: static const char* WEBSOCKET_PROTOCOL; static const char* WWW_AUTHENTICATE; - static const std::string JsonType; + static const std::string JSON_TYPE; + static const std::string AUTH_TOKEN; }; } // namespace doris diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp index fbbc1cd93bf..f91610476b4 100644 --- a/be/src/http/utils.cpp +++ b/be/src/http/utils.cpp @@ -54,6 +54,8 @@ std::string encode_basic_auth(const std::string& user, const std::string& passwd } bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string* passwd) { + // const auto& token = req.header(HttpHeaders::AUTH_TOKEN); + const char k_basic[] = "Basic "; const auto& auth = req.header(HttpHeaders::AUTHORIZATION); if (auth.compare(0, sizeof(k_basic) - 1, k_basic, sizeof(k_basic) - 1) != 0) { @@ -75,8 +77,11 @@ bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string* pa } bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth) { + // deprecated, removed in 3.1, use AUTH_TOKEN const auto& token = req.header("token"); + // deprecated, removed in 3.1, use AUTH_TOKEN const auto& auth_code = req.header(HTTP_AUTH_CODE); + const auto& auth_token = req.header(HttpHeaders::AUTH_TOKEN); std::tuple<std::string, std::string, std::string> tmp; auto& [user, pass, cluster] = tmp; @@ -93,9 +98,11 @@ bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth) { } if (!token.empty()) { - auth->token = token; + auth->token = token; // deprecated + } else if (!auth_token.empty()) { + auth->token = auth_token; } else if (!auth_code.empty()) { - auth->auth_code = std::stoll(auth_code); + auth->auth_code = std::stoll(auth_code); // deprecated } else if (!valid_basic_auth) { return false; } diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index 70ad64a81ce..b3af2531f15 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -197,7 +197,7 @@ Status MultiTablePipe::request_and_exec_plans() { // plan this load ExecEnv* exec_env = doris::ExecEnv::GetInstance(); - TNetworkAddress master_addr = exec_env->master_info()->network_address; + TNetworkAddress master_addr = exec_env->cluster_info()->master_fe_addr; int64_t stream_load_put_start_time = MonotonicNanos(); RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( master_addr.hostname, master_addr.port, diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 020d151d16b..96dc9295834 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -723,13 +723,13 @@ void StorageEngine::_update_replica_infos_callback() { !t->tablet_meta()->tablet_schema()->disable_auto_compaction() && t->tablet_meta()->tablet_schema()->enable_single_replica_compaction(); }); - TMasterInfo* master_info = ExecEnv::GetInstance()->master_info(); - if (master_info == nullptr) { + ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info(); + if (cluster_info == nullptr) { LOG(WARNING) << "Have not get FE Master heartbeat yet"; std::this_thread::sleep_for(std::chrono::seconds(2)); continue; } - TNetworkAddress master_addr = master_info->network_address; + TNetworkAddress master_addr = cluster_info->master_fe_addr; if (master_addr.hostname == "" || master_addr.port == 0) { LOG(WARNING) << "Have not get FE Master heartbeat yet"; std::this_thread::sleep_for(std::chrono::seconds(2)); diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 1fc5b7278c6..fc3a69fd5cd 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -143,13 +143,13 @@ Result<std::string> check_dest_binlog_valid(const std::string& tablet_dir, } while (false) EngineCloneTask::EngineCloneTask(StorageEngine& engine, const TCloneReq& clone_req, - const TMasterInfo& master_info, int64_t signature, + const ClusterInfo* cluster_info, int64_t signature, std::vector<TTabletInfo>* tablet_infos) : _engine(engine), _clone_req(clone_req), _tablet_infos(tablet_infos), _signature(signature), - _master_info(master_info) { + _cluster_info(cluster_info) { _mem_tracker = MemTrackerLimiter::create_shared( MemTrackerLimiter::Type::OTHER, "EngineCloneTask#tabletId=" + std::to_string(_clone_req.tablet_id)); @@ -356,7 +356,7 @@ Status EngineCloneTask::_make_and_download_snapshots(DataDir& data_dir, bool* allow_incremental_clone) { Status status; - const auto& token = _master_info.token; + const auto& token = _cluster_info->token; int timeout_s = 0; if (_clone_req.__isset.timeout_s) { diff --git a/be/src/olap/task/engine_clone_task.h b/be/src/olap/task/engine_clone_task.h index 3161b803c82..9290ed9552e 100644 --- a/be/src/olap/task/engine_clone_task.h +++ b/be/src/olap/task/engine_clone_task.h @@ -31,11 +31,11 @@ namespace doris { class DataDir; class TCloneReq; -class TMasterInfo; class TTabletInfo; class Tablet; struct Version; class StorageEngine; +class ClusterInfo; const std::string HTTP_REQUEST_PREFIX = "/api/_tablet/_download?"; const std::string HTTP_REQUEST_TOKEN_PARAM = "token="; @@ -51,7 +51,7 @@ public: Status execute() override; EngineCloneTask(StorageEngine& engine, const TCloneReq& clone_req, - const TMasterInfo& master_info, int64_t signature, + const ClusterInfo* cluster_info, int64_t signature, std::vector<TTabletInfo>* tablet_infos); ~EngineCloneTask() override = default; @@ -93,7 +93,7 @@ private: const TCloneReq& _clone_req; std::vector<TTabletInfo>* _tablet_infos = nullptr; int64_t _signature; - const TMasterInfo& _master_info; + const ClusterInfo* _cluster_info; int64_t _copy_size; int64_t _copy_time_ms; std::vector<PendingRowsetGuard> _pending_rs_guards; diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp index a7e33e7383f..03343603ae8 100644 --- a/be/src/olap/wal/wal_manager.cpp +++ b/be/src/olap/wal/wal_manager.cpp @@ -213,7 +213,7 @@ Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_ base_path = _wal_dirs_info->get_available_random_wal_dir(); std::stringstream ss; ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/" - << std::to_string(wal_version) << "_" << _exec_env->master_info()->backend_id << "_" + << std::to_string(wal_version) << "_" << _exec_env->cluster_info()->backend_id << "_" << std::to_string(wal_id) << "_" << label; { std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock); @@ -377,8 +377,8 @@ Status WalManager::_replay_background() { break; } // port == 0 means not received heartbeat yet - if (_exec_env->master_info() != nullptr && - _exec_env->master_info()->network_address.port == 0) { + if (_exec_env->cluster_info() != nullptr && + _exec_env->cluster_info()->master_fe_addr.port == 0) { continue; } // replay residual wal,only replay once diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp index 69453054d18..84cf7afd4d3 100644 --- a/be/src/olap/wal/wal_table.cpp +++ b/be/src/olap/wal/wal_table.cpp @@ -161,12 +161,15 @@ bool WalTable::_need_replay(std::shared_ptr<WalInfo> wal_info) { Status WalTable::_try_abort_txn(int64_t db_id, std::string& label) { TLoadTxnRollbackRequest request; - request.__set_auth_code(0); // this is a fake, fe not check it now + // this is a fake, fe not check it now + // should be removed in 3.1, use token instead + request.__set_auth_code(0); + request.__set_token(_exec_env->cluster_info()->curr_auth_token); request.__set_db_id(db_id); request.__set_label(label); request.__set_reason("relay wal with label " + label); TLoadTxnRollbackResult result; - TNetworkAddress master_addr = _exec_env->master_info()->network_address; + TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; auto st = ThriftRpcHelper::rpc<FrontendServiceClient>( master_addr.hostname, master_addr.port, [&request, &result](FrontendServiceConnection& client) { @@ -235,7 +238,7 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal, ctx->wal_id = wal_id; ctx->label = label; ctx->need_commit_self = false; - ctx->auth.token = "relay_wal"; // this is a fake, fe not check it now + ctx->auth.token = _exec_env->cluster_info()->curr_auth_token; ctx->auth.user = "admin"; ctx->group_commit = false; ctx->load_type = TLoadType::MANUL_LOAD; @@ -245,6 +248,7 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal, // wait stream load finish RETURN_IF_ERROR(ctx->future.get()); if (ctx->status.ok()) { + // deprecated and should be removed in 3.1, use token instead. ctx->auth.auth_code = wal_id; st = _exec_env->stream_load_executor()->commit_txn(ctx.get()); } else { @@ -293,7 +297,7 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id, request.__set_table_id(tb_id); TGetColumnInfoResult result; Status status; - TNetworkAddress master_addr = _exec_env->master_info()->network_address; + TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; if (master_addr.hostname.empty() || master_addr.port == 0) { status = Status::InternalError<false>("Have not get FE Master heartbeat yet"); } else { @@ -327,4 +331,4 @@ Status WalTable::_read_wal_header(const std::string& wal_path, std::string& colu return Status::OK(); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/runtime/cluster_info.h b/be/src/runtime/cluster_info.h new file mode 100644 index 00000000000..da66b4f0acf --- /dev/null +++ b/be/src/runtime/cluster_info.h @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/Types_types.h> + +#include <string> + +namespace doris { + +// This class is used to save the cluster info +// like cluster id, epoch, cloud_unique_id, etc. +// These info are usually in heartbeat from Master FE. +class ClusterInfo { +public: + // Unique cluster id + int32_t cluster_id = 0; + // Master FE addr: ip:rpc_port + TNetworkAddress master_fe_addr; + // Master FE http_port + int32_t master_fe_http_port = 0; + // Unique cluster token + std::string token = ""; + // Backend ID + int64_t backend_id = 0; + + // Auth token for internal authentication + // Save the last 2 tokens to avoid token invalid during token update + std::string curr_auth_token = ""; + std::string last_auth_token = ""; +}; + +} // namespace doris diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index 872069ee70a..ab24d7ca192 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -65,12 +65,16 @@ Result<BaseTabletSPtr> ExecEnv::get_tablet(int64_t tablet_id) { } const std::string& ExecEnv::token() const { - return _master_info->token; + return _cluster_info->token; } -std::map<TNetworkAddress, FrontendInfo> ExecEnv::get_frontends() { +std::vector<TFrontendInfo> ExecEnv::get_frontends() { std::lock_guard<std::mutex> lg(_frontends_lock); - return _frontends; + std::vector<TFrontendInfo> infos; + for (const auto& cur_fe : _frontends) { + infos.push_back(cur_fe.second.info); + } + return infos; } void ExecEnv::update_frontends(const std::vector<TFrontendInfo>& new_fe_infos) { @@ -173,4 +177,9 @@ void ExecEnv::wait_for_all_tasks_done() { } } +bool ExecEnv::check_auth_token(const std::string& auth_token) { + return _cluster_info->curr_auth_token == auth_token || + _cluster_info->last_auth_token == auth_token; +} + } // namespace doris diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 307c2586fc6..4a0000fa19f 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -33,6 +33,7 @@ #include "olap/rowset/segment_v2/inverted_index_writer.h" #include "olap/tablet_fwd.h" #include "pipeline/pipeline_tracing.h" +#include "runtime/cluster_info.h" #include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove this include header #include "util/threadpool.h" @@ -83,7 +84,6 @@ class BaseStorageEngine; class ResultBufferMgr; class ResultQueueMgr; class RuntimeQueryStatisticsMgr; -class TMasterInfo; class LoadChannelMgr; class LoadStreamMgr; class LoadStreamMapPool; @@ -219,7 +219,7 @@ public: UserFunctionCache* user_function_cache() { return _user_function_cache; } FragmentMgr* fragment_mgr() { return _fragment_mgr; } ResultCache* result_cache() { return _result_cache; } - TMasterInfo* master_info() { return _master_info; } + ClusterInfo* cluster_info() { return _cluster_info; } LoadPathMgr* load_path_mgr() { return _load_path_mgr; } BfdParser* bfd_parser() const { return _bfd_parser; } BrokerMgr* broker_mgr() const { return _broker_mgr; } @@ -262,7 +262,7 @@ public: void set_memtable_memory_limiter(MemTableMemoryLimiter* limiter) { _memtable_memory_limiter.reset(limiter); } - void set_master_info(TMasterInfo* master_info) { this->_master_info = master_info; } + void set_cluster_info(ClusterInfo* cluster_info) { this->_cluster_info = cluster_info; } void set_new_load_stream_mgr(std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr) { this->_new_load_stream_mgr = new_load_stream_mgr; } @@ -299,7 +299,7 @@ public: void wait_for_all_tasks_done(); void update_frontends(const std::vector<TFrontendInfo>& new_infos); - std::map<TNetworkAddress, FrontendInfo> get_frontends(); + std::vector<TFrontendInfo> get_frontends(); std::map<TNetworkAddress, FrontendInfo> get_running_frontends(); TabletSchemaCache* get_tablet_schema_cache() { return _tablet_schema_cache; } @@ -333,6 +333,8 @@ public: orc::MemoryPool* orc_memory_pool() { return _orc_memory_pool; } arrow::MemoryPool* arrow_memory_pool() { return _arrow_memory_pool; } + bool check_auth_token(const std::string& auth_token); + private: ExecEnv(); @@ -403,7 +405,7 @@ private: WorkloadGroupMgr* _workload_group_manager = nullptr; ResultCache* _result_cache = nullptr; - TMasterInfo* _master_info = nullptr; + ClusterInfo* _cluster_info = nullptr; LoadPathMgr* _load_path_mgr = nullptr; BfdParser* _bfd_parser = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 062069044dc..9d761786611 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -278,7 +278,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, _fragment_mgr = new FragmentMgr(this); _result_cache = new ResultCache(config::query_cache_max_size_mb, config::query_cache_elasticity_size_mb); - _master_info = new TMasterInfo(); + _cluster_info = new ClusterInfo(); _load_path_mgr = new LoadPathMgr(this); _bfd_parser = BfdParser::create(); _broker_mgr = new BrokerMgr(this); @@ -759,9 +759,9 @@ void ExecEnv::destroy() { // Master Info is a thrift object, it could be the last one to deconstruct. // Master info should be deconstruct later than fragment manager, because fragment will - // access master_info.backend id to access some info. If there is a running query and master + // access cluster_info.backend_id to access some info. If there is a running query and master // info is deconstructed then BE process will core at coordinator back method in fragment mgr. - SAFE_DELETE(_master_info); + SAFE_DELETE(_cluster_info); // NOTE: runtime query statistics mgr could be visited by query and daemon thread // so it should be created before all query begin and deleted after all query and daemon thread stoppped diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index e683b84e2b4..86a3a8e773d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -526,8 +526,8 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { req.runtime_state->get_unreported_errors(&(params.error_log)); params.__isset.error_log = (!params.error_log.empty()); - if (_exec_env->master_info()->__isset.backend_id) { - params.__set_backend_id(_exec_env->master_info()->backend_id); + if (_exec_env->cluster_info()->backend_id != 0) { + params.__set_backend_id(_exec_env->cluster_info()->backend_id); } TReportExecStatusResult res; diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index cd54718bc5f..f06f26b6418 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -351,12 +351,12 @@ Status GroupCommitTable::_create_group_commit_load(int be_exe_version, request.__set_strictMode(false); // this is an internal interface, use admin to pass the auth check request.__set_user("admin"); - if (_exec_env->master_info()->__isset.backend_id) { - request.__set_backend_id(_exec_env->master_info()->backend_id); + if (_exec_env->cluster_info()->backend_id != 0) { + request.__set_backend_id(_exec_env->cluster_info()->backend_id); } else { - LOG(WARNING) << "_exec_env->master_info not set backend_id"; + LOG(WARNING) << "_exec_env->cluster_info not set backend_id"; } - TNetworkAddress master_addr = _exec_env->master_info()->network_address; + TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; st = ThriftRpcHelper::rpc<FrontendServiceClient>( master_addr.hostname, master_addr.port, [&result, &request](FrontendServiceConnection& client) { @@ -440,23 +440,25 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ { status = Status::InternalError(""); }); // commit txn TLoadTxnCommitRequest request; - request.__set_auth_code(0); // this is a fake, fe not check it now + // deprecated and should be removed in 3.1, use token instead + request.__set_auth_code(0); + request.__set_token(_exec_env->cluster_info()->curr_auth_token); request.__set_db_id(db_id); request.__set_table_id(table_id); request.__set_txnId(txn_id); request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms); request.__set_groupCommit(true); request.__set_receiveBytes(state->num_bytes_load_total()); - if (_exec_env->master_info()->__isset.backend_id) { - request.__set_backendId(_exec_env->master_info()->backend_id); + if (_exec_env->cluster_info()->backend_id != 0) { + request.__set_backendId(_exec_env->cluster_info()->backend_id); } else { - LOG(WARNING) << "_exec_env->master_info not set backend_id"; + LOG(WARNING) << "_exec_env->cluster_info not set backend_id"; } if (state) { request.__set_commitInfos(state->tablet_commit_infos()); } TLoadTxnCommitResult result; - TNetworkAddress master_addr = _exec_env->master_info()->network_address; + TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; int retry_times = 0; while (retry_times < config::mow_stream_load_commit_retry_times) { st = ThriftRpcHelper::rpc<FrontendServiceClient>( @@ -482,12 +484,14 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ } else { // abort txn TLoadTxnRollbackRequest request; - request.__set_auth_code(0); // this is a fake, fe not check it now + // deprecated and should be removed in 3.1, use token instead + request.__set_auth_code(0); + request.__set_token(_exec_env->cluster_info()->curr_auth_token); request.__set_db_id(db_id); request.__set_txnId(txn_id); request.__set_reason(status.to_string()); TLoadTxnRollbackResult result; - TNetworkAddress master_addr = _exec_env->master_info()->network_address; + TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; st = ThriftRpcHelper::rpc<FrontendServiceClient>( master_addr.hostname, master_addr.port, [&request, &result](FrontendServiceConnection& client) { diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 2c69b8a5870..06150ae3d20 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -230,7 +230,9 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { ctx->db = task.db; ctx->table = task.tbl; ctx->label = task.label; + // deprecated, removed in 3.1, use auth token instead. ctx->auth.auth_code = task.auth_code; + ctx->auth.token = _exec_env->cluster_info()->curr_auth_token; if (task.__isset.max_interval_s) { ctx->max_interval_s = task.max_interval_s; diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 77fd80cd528..f07b308850e 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -162,7 +162,7 @@ TReportExecStatusParams RuntimeQueryStatisticsMgr::create_report_exec_status_par TReportExecStatusParams req; THRIFT_MOVE_VALUES(req, query_profile, profile); - req.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id); + req.__set_backend_id(ExecEnv::GetInstance()->cluster_info()->backend_id); // invalid query id to avoid API compatibility during upgrade req.__set_query_id(TUniqueId()); req.__set_done(is_done); @@ -341,7 +341,7 @@ void RuntimeQueryStatisticsMgr::register_query_statistics(std::string query_id, } void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() { - int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id; + int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id; // 1 get query statistics map std::map<TNetworkAddress, std::map<std::string, TQueryStatistics>> fe_qs_map; std::map<std::string, std::pair<bool, bool>> qs_status; // <finished, timeout> @@ -515,7 +515,7 @@ void RuntimeQueryStatisticsMgr::set_workload_group_id(std::string query_id, int6 void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* block) { std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock); - int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id; + int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id; // block's schema come from SchemaBackendActiveTasksScanner::_s_tbls_columns for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) { diff --git a/be/src/runtime/small_file_mgr.cpp b/be/src/runtime/small_file_mgr.cpp index 19a05f9c9dc..15d2c937be2 100644 --- a/be/src/runtime/small_file_mgr.cpp +++ b/be/src/runtime/small_file_mgr.cpp @@ -169,10 +169,10 @@ Status SmallFileMgr::_download_file(int64_t file_id, const std::string& md5, HttpClient client; std::stringstream url_ss; - TMasterInfo* master_info = _exec_env->master_info(); - url_ss << master_info->network_address.hostname << ":" << master_info->http_port + ClusterInfo* cluster_info = _exec_env->cluster_info(); + url_ss << cluster_info->master_fe_addr.hostname << ":" << cluster_info->master_fe_http_port << "/api/get_small_file?" - << "file_id=" << file_id << "&token=" << master_info->token; + << "file_id=" << file_id << "&token=" << cluster_info->token; std::string url = url_ss.str(); diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index d04a5463879..784904c78a3 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -942,7 +942,7 @@ Status SnapshotLoader::_report_every(int report_threshold, int* counter, int32_t LOG(INFO) << "report to frontend. job id: " << _job_id << ", task id: " << _task_id << ", finished num: " << finished_num << ", total num:" << total_num; - TNetworkAddress master_addr = _env->master_info()->network_address; + TNetworkAddress master_addr = _env->cluster_info()->master_fe_addr; TSnapshotLoaderReportRequest request; request.job_id = _job_id; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index ec83141893a..482fadac44e 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -173,12 +173,12 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { request.__set_timeout(ctx->timeout_second); } request.__set_request_id(ctx->id.to_thrift()); - request.__set_backend_id(_exec_env->master_info()->backend_id); + request.__set_backend_id(_exec_env->cluster_info()->backend_id); TLoadTxnBeginResult result; Status status; int64_t duration_ns = 0; - TNetworkAddress master_addr = _exec_env->master_info()->network_address; + TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; if (master_addr.hostname.empty() || master_addr.port == 0) { status = Status::Error<SERVICE_UNAVAILABLE>("Have not get FE Master heartbeat yet"); } else { @@ -215,7 +215,7 @@ Status StreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) { TLoadTxnCommitRequest request; get_commit_request(ctx, request); - TNetworkAddress master_addr = _exec_env->master_info()->network_address; + TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; TLoadTxnCommitResult result; int64_t duration_ns = 0; { @@ -260,7 +260,7 @@ Status StreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) { request.__set_txnId(ctx->txn_id); } - TNetworkAddress master_addr = _exec_env->master_info()->network_address; + TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; TLoadTxn2PCResult result; int64_t duration_ns = 0; { @@ -312,7 +312,7 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { TLoadTxnCommitRequest request; get_commit_request(ctx, request); - TNetworkAddress master_addr = _exec_env->master_info()->network_address; + TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; TLoadTxnCommitResult result; #ifndef BE_TEST RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( @@ -344,7 +344,7 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { DorisMetrics::instance()->stream_load_txn_rollback_request_total->increment(1); - TNetworkAddress master_addr = _exec_env->master_info()->network_address; + TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; TLoadTxnRollbackRequest request; set_request_auth(&request, ctx->auth); request.__set_db(ctx->db); diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 003f07f1db0..927d4d13814 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -254,7 +254,7 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() { } void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) { - int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id; + int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id; int cpu_num = CpuInfo::num_cores(); cpu_num = cpu_num <= 0 ? 1 : cpu_num; uint64_t total_cpu_time_ns_per_second = cpu_num * 1000000000ll; diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index e6fdfaa8765..abdef513296 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -599,7 +599,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) { } // namespace BaseBackendService::BaseBackendService(ExecEnv* exec_env) - : _exec_env(exec_env), _agent_server(new AgentServer(exec_env, *exec_env->master_info())) {} + : _exec_env(exec_env), _agent_server(new AgentServer(exec_env, exec_env->cluster_info())) {} BaseBackendService::~BaseBackendService() = default; diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index dcc76259868..00935e8fc64 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -83,9 +83,7 @@ #include "util/thrift_server.h" #include "util/uid_util.h" -namespace doris { -class TMasterInfo; -} // namespace doris +namespace doris {} // namespace doris static void help(const char*); @@ -579,11 +577,11 @@ int main(int argc, char** argv) { stop_work_if_error(status, "Doris Be http service did not start correctly, exiting"); // 4. heart beat server - doris::TMasterInfo* master_info = exec_env->master_info(); + doris::ClusterInfo* cluster_info = exec_env->cluster_info(); std::unique_ptr<doris::ThriftServer> heartbeat_thrift_server; doris::Status heartbeat_status = doris::create_heartbeat_server( exec_env, doris::config::heartbeat_service_port, &heartbeat_thrift_server, - doris::config::heartbeat_service_thread_count, master_info); + doris::config::heartbeat_service_thread_count, cluster_info); stop_work_if_error(heartbeat_status, "Heartbeat services did not start correctly, exiting"); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 89b43ec5223..ae84081813f 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -500,7 +500,7 @@ Status PInternalService::_exec_plan_fragment_impl( const std::function<void(RuntimeState*, Status*)>& cb) { // Sometimes the BE do not receive the first heartbeat message and it receives request from FE // If BE execute this fragment, it will core when it wants to get some property from master info. - if (ExecEnv::GetInstance()->master_info() == nullptr) { + if (ExecEnv::GetInstance()->cluster_info() == nullptr) { return Status::InternalError( "Have not receive the first heartbeat message from master, not ready to provide " "service"); diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp b/be/src/vec/exec/scan/vmeta_scanner.cpp index 180d18b0c2c..289930b16bc 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.cpp +++ b/be/src/vec/exec/scan/vmeta_scanner.cpp @@ -278,7 +278,7 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { // _state->execution_timeout() is seconds, change to milliseconds int time_out = _state->execution_timeout() * 1000; - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; TFetchSchemaTableDataResult result; RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( master_addr.hostname, master_addr.port, diff --git a/be/src/vec/sink/autoinc_buffer.cpp b/be/src/vec/sink/autoinc_buffer.cpp index 4bc87dff489..80ce9d494d5 100644 --- a/be/src/vec/sink/autoinc_buffer.cpp +++ b/be/src/vec/sink/autoinc_buffer.cpp @@ -46,7 +46,7 @@ void AutoIncIDBuffer::set_batch_size_at_least(size_t batch_size) { Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3; _rpc_status = Status::OK(); - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; for (uint32_t retry_times = 0; retry_times < FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) { TAutoIncrementRangeRequest request; TAutoIncrementRangeResult result; @@ -167,4 +167,4 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) { return Status::OK(); } -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp index 74a2830a191..2de21edd80b 100644 --- a/be/src/vec/sink/vrow_distribution.cpp +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -102,7 +102,7 @@ Status VRowDistribution::automatic_create_partition() { request.__set_be_endpoint(be_endpoint); VLOG_NOTICE << "automatic partition rpc begin request " << request; - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; int time_out = _state->execution_timeout() * 1000; RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( master_addr.hostname, master_addr.port, @@ -175,7 +175,7 @@ Status VRowDistribution::_replace_overwriting_partition() { request.__set_be_endpoint(be_endpoint); VLOG_NOTICE << "auto detect replace partition request: " << request; - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; int time_out = _state->execution_timeout() * 1000; RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( master_addr.hostname, master_addr.port, diff --git a/be/test/agent/heartbeat_server_test.cpp b/be/test/agent/heartbeat_server_test.cpp index db3a7a2eb78..f0c93a8caca 100644 --- a/be/test/agent/heartbeat_server_test.cpp +++ b/be/test/agent/heartbeat_server_test.cpp @@ -35,12 +35,12 @@ namespace doris { TEST(HeartbeatTest, TestHeartbeat) { setenv("DORIS_HOME", "./", 1); THeartbeatResult heartbeat_result; - TMasterInfo ori_master_info; - ori_master_info.cluster_id = -1; - ori_master_info.epoch = 0; - ori_master_info.network_address.hostname = ""; - ori_master_info.network_address.port = 0; - HeartbeatServer heartbeat_server(&ori_master_info); + ClusterInfo ori_cluster_info; + ori_cluster_info.cluster_id = -1; + ori_cluster_info.epoch = 0; + ori_cluster_info.network_address.hostname = ""; + ori_cluster_info.network_address.port = 0; + HeartbeatServer heartbeat_server(&ori_cluster_info); heartbeat_server.heartbeat(heartbeat_result, master_info); EXPECT_EQ(TStatusCode::OK, heartbeat_result.status.status_code); diff --git a/be/test/agent/mock_utils.h b/be/test/agent/mock_utils.h index 3a14ca79626..b49ff3e372a 100644 --- a/be/test/agent/mock_utils.h +++ b/be/test/agent/mock_utils.h @@ -33,7 +33,7 @@ public: class MockMasterServerClient : public MasterServerClient { public: - MockMasterServerClient(const TMasterInfo& master_info, + MockMasterServerClient(const ClusterInfo* cluster_info, FrontendServiceClientCache* client_cache); MOCK_METHOD2(finish_task, Status(const TFinishTaskRequest request, TMasterResult* result)); MOCK_METHOD2(report, Status(const TReportRequest request, TMasterResult* result)); diff --git a/be/test/agent/task_worker_pool_test.cpp b/be/test/agent/task_worker_pool_test.cpp index 0c7fd88396b..7be29e6feb4 100644 --- a/be/test/agent/task_worker_pool_test.cpp +++ b/be/test/agent/task_worker_pool_test.cpp @@ -27,6 +27,7 @@ #include "olap/options.h" #include "olap/storage_engine.h" +#include "runtime/cluster_info.h" namespace doris { @@ -106,14 +107,14 @@ TEST(TaskWorkerPoolTest, ReportWorkerPool) { ExecEnv::GetInstance()->set_storage_engine(std::make_unique<StorageEngine>(EngineOptions {})); Defer defer {[] { ExecEnv::GetInstance()->set_storage_engine(nullptr); }}; - TMasterInfo master_info; + ClusterInfo cluster_info; std::atomic_int count {0}; - ReportWorker worker("test", master_info, 1, [&] { ++count; }); + ReportWorker worker("test", &cluster_info, 1, [&] { ++count; }); worker.notify(); // Not received heartbeat yet, ignore std::this_thread::sleep_for(100ms); - master_info.network_address.__set_port(9030); + cluster_info.master_fe_addr.__set_port(9030); worker.notify(); std::this_thread::sleep_for(100ms); EXPECT_EQ(count.load(), 1); diff --git a/be/test/http/http_client_test.cpp b/be/test/http/http_client_test.cpp index 4e2ada03c66..d42e0a6775e 100644 --- a/be/test/http/http_client_test.cpp +++ b/be/test/http/http_client_test.cpp @@ -140,7 +140,7 @@ TEST_F(HttpClientTest, get_normal) { client.set_basic_auth("test1", ""); std::string response; st = client.execute(&response); - EXPECT_TRUE(st.ok()); + EXPECT_TRUE(st.ok()) << st; EXPECT_STREQ("test1", response.c_str()); // for head @@ -380,7 +380,7 @@ TEST_F(HttpClientTest, enable_http_auth) { std::cout << "st = " << st << "\n"; std::cout << "response = " << response << "\n"; std::cout << "st.msg() = " << st.msg() << "\n"; - EXPECT_TRUE(!st.ok()); + EXPECT_TRUE(!st.ok()) << st; EXPECT_TRUE(st.msg().find("The requested URL returned error") != std::string::npos); } @@ -477,6 +477,36 @@ TEST_F(HttpClientTest, enable_http_auth) { EXPECT_TRUE(st.msg().find("Operation timed out after") != std::string::npos); } + // valid token + { + config::enable_all_http_auth = true; + std::string url = hostname + "/metrics"; + HttpClient client; + auto st = client.init(url); + EXPECT_TRUE(st.ok()); + client.set_method(GET); + client.set_auth_token("valid_token"); + client.set_timeout_ms(200); + std::string response; + st = client.execute(&response); + EXPECT_TRUE(st.ok()) << st; + } + + // invalid token + { + config::enable_all_http_auth = true; + std::string url = hostname + "/metrics"; + HttpClient client; + auto st = client.init(url); + EXPECT_TRUE(st.ok()); + client.set_method(GET); + client.set_auth_token("invalid_token"); + client.set_timeout_ms(200); + std::string response; + st = client.execute(&response); + EXPECT_TRUE(!st.ok()) << st; + } + { config::enable_all_http_auth = true; std::string url = hostname + "/api/glog/adjust"; diff --git a/be/test/olap/wal/wal_manager_test.cpp b/be/test/olap/wal/wal_manager_test.cpp index afb4c7696ef..32162593fc0 100644 --- a/be/test/olap/wal/wal_manager_test.cpp +++ b/be/test/olap/wal/wal_manager_test.cpp @@ -55,10 +55,10 @@ public: void SetUp() override { prepare(); _env = ExecEnv::GetInstance(); - _env->_master_info = new TMasterInfo(); - _env->_master_info->network_address.hostname = "host name"; - _env->_master_info->network_address.port = 1234; - _env->_master_info->backend_id = 1001; + _env->_cluster_info = new ClusterInfo(); + _env->_cluster_info->master_fe_addr.hostname = "host name"; + _env->_cluster_info->master_fe_addr.port = 1234; + _env->_cluster_info->backend_id = 1001; _env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared(); _env->_internal_client_cache = new BrpcClientCache<PBackendService_Stub>(); _env->_function_client_cache = new BrpcClientCache<PFunctionService_Stub>(); @@ -77,7 +77,7 @@ public: SAFE_STOP(_env->_wal_manager); SAFE_DELETE(_env->_function_client_cache); SAFE_DELETE(_env->_internal_client_cache); - SAFE_DELETE(_env->_master_info); + SAFE_DELETE(_env->_cluster_info); } void prepare() { @@ -242,4 +242,4 @@ TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) { EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::InternalError("")); EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, available_bytes); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/test/runtime/routine_load_task_executor_test.cpp b/be/test/runtime/routine_load_task_executor_test.cpp index 338b82c6eba..5c2b39bce1f 100644 --- a/be/test/runtime/routine_load_task_executor_test.cpp +++ b/be/test/runtime/routine_load_task_executor_test.cpp @@ -55,7 +55,7 @@ public: k_stream_load_rollback_result = TLoadTxnRollbackResult(); k_stream_load_put_result = TStreamLoadPutResult(); - _env.set_master_info(new TMasterInfo()); + _env.set_cluster_info(new ClusterInfo()); _env.set_new_load_stream_mgr(NewLoadStreamMgr::create_unique()); _env.set_stream_load_executor(StreamLoadExecutor::create_unique(&_env)); @@ -63,7 +63,7 @@ public: config::max_consumer_num_per_group = 3; } - void TearDown() override { delete _env.master_info(); } + void TearDown() override { delete _env.cluster_info(); } ExecEnv _env; }; @@ -121,4 +121,4 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) { executor.stop(); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/test/vec/exec/vfile_scanner_exception_test.cpp b/be/test/vec/exec/vfile_scanner_exception_test.cpp index 1d565c7e0ce..4b6ce46bd88 100644 --- a/be/test/vec/exec/vfile_scanner_exception_test.cpp +++ b/be/test/vec/exec/vfile_scanner_exception_test.cpp @@ -26,6 +26,7 @@ #include "io/fs/local_file_system.h" #include "olap/wal/wal_manager.h" #include "pipeline/exec/file_scan_operator.h" +#include "runtime/cluster_info.h" #include "runtime/descriptors.h" #include "runtime/memory/mem_tracker.h" #include "runtime/runtime_state.h" @@ -107,7 +108,7 @@ private: TFileRangeDesc _range_desc; TFileScanRange _scan_range; std::unique_ptr<ShardedKVCache> _kv_cache = nullptr; - std::unique_ptr<TMasterInfo> _master_info = nullptr; + std::unique_ptr<ClusterInfo> _cluster_info = nullptr; }; void VfileScannerExceptionTest::_init_desc_table() { @@ -266,12 +267,12 @@ void VfileScannerExceptionTest::init() { _scan_range.params.format_type = TFileFormatType::FORMAT_JNI; _kv_cache.reset(new ShardedKVCache(48)); - _master_info.reset(new TMasterInfo()); + _cluster_info.reset(new ClusterInfo()); _env = ExecEnv::GetInstance(); - _env->_master_info = _master_info.get(); - _env->_master_info->network_address.hostname = "host name"; - _env->_master_info->network_address.port = _backend_id; - _env->_master_info->backend_id = 1001; + _env->_cluster_info = _cluster_info.get(); + _env->_cluster_info->master_fe_addr.hostname = "host name"; + _env->_cluster_info->master_fe_addr.port = _backend_id; + _env->_cluster_info->backend_id = 1001; _env->_wal_manager = 0; } diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp index 0944193e6d0..5c4056a8c24 100644 --- a/be/test/vec/exec/vwal_scanner_test.cpp +++ b/be/test/vec/exec/vwal_scanner_test.cpp @@ -26,6 +26,7 @@ #include "io/fs/local_file_system.h" #include "olap/wal/wal_manager.h" #include "pipeline/exec/file_scan_operator.h" +#include "runtime/cluster_info.h" #include "runtime/descriptors.h" #include "runtime/memory/mem_tracker.h" #include "runtime/runtime_state.h" @@ -116,7 +117,7 @@ private: TFileRangeDesc _range_desc; TFileScanRange _scan_range; std::unique_ptr<ShardedKVCache> _kv_cache = nullptr; - std::unique_ptr<TMasterInfo> _master_info = nullptr; + std::unique_ptr<ClusterInfo> _cluster_info = nullptr; }; void VWalScannerTest::_init_desc_table() { @@ -279,12 +280,12 @@ void VWalScannerTest::init() { _scan_range.params.format_type = TFileFormatType::FORMAT_WAL; _kv_cache.reset(new ShardedKVCache(48)); - _master_info.reset(new TMasterInfo()); + _cluster_info.reset(new ClusterInfo()); _env = ExecEnv::GetInstance(); - _env->_master_info = _master_info.get(); - _env->_master_info->network_address.hostname = "host name"; - _env->_master_info->network_address.port = _backend_id; - _env->_master_info->backend_id = 1001; + _env->_cluster_info = _cluster_info.get(); + _env->_cluster_info->master_fe_addr.hostname = "host name"; + _env->_cluster_info->master_fe_addr.port = _backend_id; + _env->_cluster_info->backend_id = 1001; _env->_wal_manager = WalManager::create_shared(_env, _wal_dir); std::string base_path; auto st = _env->_wal_manager->_init_wal_dirs_info(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 26c3cc1b1b4..e48b514cdd2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -571,6 +571,8 @@ public class Env { private final List<String> forceSkipJournalIds = Arrays.asList(Config.force_skip_journal_ids); + private TokenManager tokenManager; + // if a config is relative to a daemon thread. record the relation here. we will proactively change interval of it. private final Map<String, Supplier<MasterDaemon>> configtoThreads = ImmutableMap .of("dynamic_partition_check_interval_seconds", this::getDynamicPartitionScheduler); @@ -817,6 +819,7 @@ public class Env { this.sqlCacheManager = new NereidsSqlCacheManager(); this.splitSourceManager = new SplitSourceManager(); this.globalExternalTransactionInfoMgr = new GlobalExternalTransactionInfoMgr(); + this.tokenManager = new TokenManager(); } public static void destroyCheckpoint() { @@ -1852,6 +1855,7 @@ public class Env { // start threads that should run on all FE protected void startNonMasterDaemonThreads() { // start load manager thread + tokenManager.start(); loadManager.start(); tabletStatMgr.start(); @@ -4368,6 +4372,10 @@ public class Env { return loadManager; } + public TokenManager getTokenManager() { + return tokenManager; + } + public ProgressManager getProgressManager() { return progressManager; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TokenManager.java similarity index 95% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java rename to fe/fe-core/src/main/java/org/apache/doris/catalog/TokenManager.java index 21e9f9b0434..c6c6911a044 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TokenManager.java @@ -15,9 +15,8 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.load.loadv2; +package org.apache.doris.catalog; -import org.apache.doris.catalog.Env; import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; import org.apache.doris.common.CustomThreadFactory; @@ -39,6 +38,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +/** + * This class is used to manage the token for internal authentication. + * It will generate a new token every 12 hours(Config.token_generate_period_hour) + * and keep at most 6 tokens(Config.token_queue_size). + * So each token will be valid for 3 days. + * Only Master FE can generate a new token. + */ public class TokenManager { private static final Logger LOG = LogManager.getLogger(TokenManager.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 70492e5eab2..2f9efc1ed1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -575,7 +575,7 @@ public class LoadAction extends RestBaseController { // So this function is not widely tested under general scenario private boolean checkClusterToken(String token) { try { - return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token); + return Env.getCurrentEnv().getTokenManager().checkAuthToken(token); } catch (UserException e) { throw new UnauthorizedException(e.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 7887d8c602b..07c459d61cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -101,16 +101,13 @@ public class LoadManager implements Writable { private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private MysqlLoadManager mysqlLoadManager; - private TokenManager tokenManager; public LoadManager(LoadJobScheduler loadJobScheduler) { this.loadJobScheduler = loadJobScheduler; - this.tokenManager = new TokenManager(); - this.mysqlLoadManager = new MysqlLoadManager(tokenManager); + this.mysqlLoadManager = new MysqlLoadManager(); } public void start() { - tokenManager.start(); mysqlLoadManager.start(); } @@ -184,10 +181,6 @@ public class LoadManager implements Writable { return mysqlLoadManager; } - public TokenManager getTokenManager() { - return tokenManager; - } - public void replayCreateLoadJob(LoadJob loadJob) { createLoadJob(loadJob); LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()).add("msg", "replay create load job").build()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java index d09e73b4a33..68dffbfb3e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java @@ -76,7 +76,6 @@ public class MysqlLoadManager { private static final Logger LOG = LogManager.getLogger(MysqlLoadManager.class); private ThreadPoolExecutor mysqlLoadPool; - private final TokenManager tokenManager; private static class MySqlLoadContext { private boolean finished; @@ -143,8 +142,7 @@ public class MysqlLoadManager { private EvictingQueue<MySqlLoadFailRecord> failedRecords; private ScheduledExecutorService periodScheduler; - public MysqlLoadManager(TokenManager tokenManager) { - this.tokenManager = tokenManager; + public MysqlLoadManager() { } public void start() { @@ -178,7 +176,7 @@ public class MysqlLoadManager { VariableMgr.setVar(sessionVariable, new SetVar(SessionVariable.QUERY_TIMEOUT, new StringLiteral(String.valueOf(newTimeOut)))); } - String token = tokenManager.acquireToken(); + String token = Env.getCurrentEnv().getTokenManager().acquireToken(); boolean clientLocal = dataDesc.isClientLocal(); MySqlLoadContext loadContext = new MySqlLoadContext(); loadContextMap.put(loadId, loadContext); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java index b00f89db4b5..f4a7259bf71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java @@ -136,7 +136,7 @@ public class CanalSyncChannel extends SyncChannel { FrontendOptions.getLocalHostAddress(), ExecuteEnv.getInstance().getStartupTime()), sourceType, timeoutSecond); - String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); + String token = Env.getCurrentEnv().getTokenManager().acquireToken(); request = new TStreamLoadPutRequest() .setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl()) .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index cf9a8797161..6b301c16f73 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -198,7 +198,7 @@ public class InsertUtils { String label = txnEntry.getLabel(); try { long txnId; - String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); + String token = Env.getCurrentEnv().getTokenManager().acquireToken(); if (Config.isCloudMode() || Env.getCurrentEnv().isMaster()) { txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), label, diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java index 06789b864b6..644bddee58b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java @@ -187,7 +187,7 @@ public class AuditLoader extends Plugin implements AuditPlugin { String token = ""; try { // Acquire token from master - token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); + token = Env.getCurrentEnv().getTokenManager().acquireToken(); } catch (Exception e) { LOG.warn("Failed to get auth token: {}", e); discardLogNum += auditLogNum; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 7b557418b80..9daaf525ca4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -2247,10 +2247,10 @@ public class StmtExecutor { ExecuteEnv.getInstance().getStartupTime()), sourceType, timeoutSecond); txnConf.setTxnId(txnId); - String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); + String token = Env.getCurrentEnv().getTokenManager().acquireToken(); txnConf.setToken(token); } else { - String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); + String token = Env.getCurrentEnv().getTokenManager().acquireToken(); MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(context); TLoadTxnBeginRequest request = new TLoadTxnBeginRequest(); request.setDb(txnConf.getDb()).setTbl(txnConf.getTbl()).setToken(token) diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index ba49d6da8e6..ac342672638 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1230,7 +1230,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { private TLoadTxnBeginResult loadTxnBeginImpl(TLoadTxnBeginRequest request, String clientIp) throws UserException { if (request.isSetAuthCode()) { - // TODO(cmy): find a way to check + // TODO: deprecated, removed in 3.1, use token instead. } else if (Strings.isNullOrEmpty(request.getToken())) { checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), request.getTbl(), @@ -1464,7 +1464,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { private void loadTxnPreCommitImpl(TLoadTxnCommitRequest request) throws UserException { if (request.isSetAuthCode()) { - // CHECKSTYLE IGNORE THIS LINE + // TODO: deprecated, removed in 3.1, use token instead. } else if (request.isSetToken()) { if (!checkToken(request.getToken())) { throw new AuthenticationException("Invalid token: " + request.getToken()); @@ -1650,7 +1650,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { // timeout private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws UserException { if (request.isSetAuthCode()) { - // TODO(cmy): find a way to check + // TODO: deprecated, removed in 3.1, use token instead. } else if (request.isSetToken()) { checkToken(request.getToken()); } else { @@ -1791,7 +1791,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { // Step 3: check auth if (request.isSetAuthCode()) { - // TODO(cmy): find a way to check + // TODO: deprecated, removed in 3.1, use token instead. } else if (request.isSetToken()) { checkToken(request.getToken()); } else { @@ -1871,7 +1871,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { private void loadTxnRollbackImpl(TLoadTxnRollbackRequest request) throws UserException { if (request.isSetAuthCode()) { - // TODO(cmy): find a way to check + // TODO: deprecated, removed in 3.1, use token instead. } else if (request.isSetToken()) { checkToken(request.getToken()); } else { @@ -1994,7 +1994,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { // Step 3: check auth if (request.isSetAuthCode()) { - // TODO(cmy): find a way to check + // TODO: deprecated, removed in 3.1, use token instead. } else if (request.isSetToken()) { checkToken(request.getToken()); } else { @@ -2159,7 +2159,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { ConnectContext ctx = ConnectContext.get(); if (request.isSetAuthCode()) { - // TODO(cmy): find a way to check + // TODO: deprecated, removed in 3.1, use token instead. } else if (Strings.isNullOrEmpty(request.getToken())) { checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), request.getTbl(), @@ -2390,7 +2390,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { TStatus status = new TStatus(TStatusCode.OK); result.setStatus(status); try { - String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); + String token = Env.getCurrentEnv().getTokenManager().acquireToken(); result.setToken(token); } catch (Throwable e) { LOG.warn("catch unknown result.", e); @@ -2409,7 +2409,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { LOG.debug("receive check token request from client: {}", clientAddr); } try { - return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token); + return Env.getCurrentEnv().getTokenManager().checkAuthToken(token); } catch (Throwable e) { LOG.warn("catch unknown result.", e); return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index fb6853e83c3..d7eff484c6a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -250,6 +250,7 @@ public class HeartbeatMgr extends MasterDaemon { copiedMasterInfo.setHeartbeatFlags(flags); copiedMasterInfo.setBackendId(backendId); copiedMasterInfo.setFrontendInfos(feInfos); + copiedMasterInfo.setAuthToken(Env.getCurrentEnv().getTokenManager().acquireToken()); if (Config.isCloudMode()) { String cloudUniqueId = backend.getTagMap().get(Tag.CLOUD_UNIQUE_ID); copiedMasterInfo.setCloudUniqueId(cloudUniqueId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java index fcab55866ed..25c4ff4b3b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java @@ -225,7 +225,7 @@ public class TransactionEntry { ExecuteEnv.getInstance().getStartupTime()), LoadJobSourceType.INSERT_STREAMING, timeoutSecond); } else { - String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); + String token = Env.getCurrentEnv().getTokenManager().acquireToken(); MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(ConnectContext.get()); TLoadTxnBeginRequest request = new TLoadTxnBeginRequest(); request.setDb(database.getFullName()).setTbl(table.getName()).setToken(token) diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java index 13ae9b6e44e..2f8f1437344 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java @@ -17,6 +17,7 @@ package org.apache.doris.load.loadv2; +import org.apache.doris.catalog.TokenManager; import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 77e49e6c1fa..7078d5eaeab 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -645,12 +645,12 @@ struct TLoadTxnBeginRequest { 6: optional string user_ip 7: required string label 8: optional i64 timestamp // deprecated, use request_id instead - 9: optional i64 auth_code + 9: optional i64 auth_code // deprecated, use token instead // The real value of timeout should be i32. i64 ensures the compatibility of interface. 10: optional i64 timeout 11: optional Types.TUniqueId request_id 12: optional string token - 13: optional string auth_code_uuid + 13: optional string auth_code_uuid // deprecated, use token instead 14: optional i64 table_id 15: optional i64 backend_id } @@ -670,7 +670,7 @@ struct TBeginTxnRequest { 5: optional list<i64> table_ids 6: optional string user_ip 7: optional string label - 8: optional i64 auth_code + 8: optional i64 auth_code // deprecated, use token instead // The real value of timeout should be i32. i64 ensures the compatibility of interface. 9: optional i64 timeout 10: optional Types.TUniqueId request_id @@ -718,7 +718,7 @@ struct TStreamLoadPutRequest { 14: optional string columnSeparator 15: optional string partitions - 16: optional i64 auth_code + 16: optional i64 auth_code // deprecated, use token instead 17: optional bool negative 18: optional i32 timeout 19: optional bool strictMode @@ -831,14 +831,14 @@ struct TLoadTxnCommitRequest { 7: required i64 txnId 8: required bool sync 9: optional list<Types.TTabletCommitInfo> commitInfos - 10: optional i64 auth_code + 10: optional i64 auth_code // deprecated, use token instead 11: optional TTxnCommitAttachment txnCommitAttachment 12: optional i64 thrift_rpc_timeout_ms 13: optional string token 14: optional i64 db_id 15: optional list<string> tbls 16: optional i64 table_id - 17: optional string auth_code_uuid + 17: optional string auth_code_uuid // deprecated, use token instead 18: optional bool groupCommit 19: optional i64 receiveBytes 20: optional i64 backendId @@ -856,7 +856,7 @@ struct TCommitTxnRequest { 5: optional string user_ip 6: optional i64 txn_id 7: optional list<Types.TTabletCommitInfo> commit_infos - 8: optional i64 auth_code + 8: optional i64 auth_code // deprecated, use token instead 9: optional TTxnCommitAttachment txn_commit_attachment 10: optional i64 thrift_rpc_timeout_ms 11: optional string token @@ -879,13 +879,13 @@ struct TLoadTxn2PCRequest { 5: optional string user_ip 6: optional i64 txnId 7: optional string operation - 8: optional i64 auth_code + 8: optional i64 auth_code // deprecated, use token instead 9: optional string token 10: optional i64 thrift_rpc_timeout_ms 11: optional string label // For cloud - 1000: optional string auth_code_uuid + 1000: optional string auth_code_uuid // deprecated, use token instead } struct TLoadTxn2PCResult { @@ -900,7 +900,7 @@ struct TRollbackTxnRequest { 5: optional string user_ip 6: optional i64 txn_id 7: optional string reason - 9: optional i64 auth_code + 9: optional i64 auth_code // deprecated, use token instead 10: optional TTxnCommitAttachment txn_commit_attachment 11: optional string token 12: optional i64 db_id @@ -920,12 +920,12 @@ struct TLoadTxnRollbackRequest { 6: optional string user_ip 7: required i64 txnId 8: optional string reason - 9: optional i64 auth_code + 9: optional i64 auth_code // deprecated, use token instead 10: optional TTxnCommitAttachment txnCommitAttachment 11: optional string token 12: optional i64 db_id 13: optional list<string> tbls - 14: optional string auth_code_uuid + 14: optional string auth_code_uuid // deprecated, use token instead 15: optional string label } diff --git a/gensrc/thrift/HeartbeatService.thrift b/gensrc/thrift/HeartbeatService.thrift index acdc608f21b..47c41650b78 100644 --- a/gensrc/thrift/HeartbeatService.thrift +++ b/gensrc/thrift/HeartbeatService.thrift @@ -43,6 +43,7 @@ struct TMasterInfo { 11: optional string cloud_unique_id; // See configuration item Config.java rehash_tablet_after_be_dead_seconds for meaning 12: optional i64 tablet_report_inactive_duration_ms; + 13: optional string auth_token; } struct TBackendInfo { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org