This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5af0d377399 [feature](api) add BE HTTP /api/load_streams (#36312) 5af0d377399 is described below commit 5af0d3773993228b0197f4c936726f9c81ef93ba Author: Kaijie Chen <c...@apache.org> AuthorDate: Sat Jun 15 10:11:36 2024 +0800 [feature](api) add BE HTTP /api/load_streams (#36312) Add BE HTTP API `/api/load_streams` to get all load_id for existing load_streams. Example: ```console $ curl http://localhost:8040/api/load_streams {"msg":"OK","code":0,"data":{"host":"10.16.10.7","load_streams":[]},"count":0} $ curl http://localhost:8040/api/load_streams {"msg":"OK","code":0,"data":{"host":"10.16.10.7","load_streams":[{"load_id":"43fc5dd321f04407-96b7b2cd38323c3a"}]},"count":1} $ curl http://localhost:8040/api/load_streams {"msg":"OK","code":0,"data":{"host":"10.16.10.7","load_streams":[{"load_id":"02b91d709808469b-8a43d591df446353"}]},"count":1} $ curl http://localhost:8040/api/load_streams {"msg":"OK","code":0,"data":{"host":"10.16.10.7","load_streams":[]},"count":0} ``` ```console $ curl -s http://localhost:8040/api/load_streams | jq { "msg": "OK", "code": 0, "data": { "host": "10.16.10.7", "load_streams": [ { "load_id": "acdfeb53e3c34037-91fd829d1f2fe0de" }, { "load_id": "e03742da4e5a4862-a7b0f36ad3b294c2" } ] }, "count": 2 } ``` --- be/src/http/action/load_stream_action.cpp | 66 +++++++++++++++++++++++++++++++ be/src/http/action/load_stream_action.h | 42 ++++++++++++++++++++ be/src/runtime/exec_env.cpp | 1 + be/src/runtime/exec_env.h | 2 + be/src/runtime/exec_env_init.cpp | 5 +++ be/src/runtime/load_stream_mgr.cpp | 15 ++----- be/src/runtime/load_stream_mgr.h | 16 +++++++- be/src/service/http_service.cpp | 5 +++ be/src/service/internal_service.cpp | 10 ++--- be/src/service/internal_service.h | 3 -- be/test/runtime/load_stream_test.cpp | 4 +- 11 files changed, 147 insertions(+), 22 deletions(-) diff --git a/be/src/http/action/load_stream_action.cpp b/be/src/http/action/load_stream_action.cpp new file mode 100644 index 00000000000..087f3b0f927 --- /dev/null +++ b/be/src/http/action/load_stream_action.cpp @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http/action/load_stream_action.h" + +#include <algorithm> +#include <cstdint> +#include <cstdlib> +#include <limits> +#include <string> +#include <vector> + +#include "cloud/config.h" +#include "http/http_channel.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_status.h" +#include "olap/olap_common.h" +#include "olap/storage_engine.h" +#include "olap/tablet_manager.h" +#include "runtime/exec_env.h" +#include "runtime/load_stream_mgr.h" +#include "service/backend_options.h" + +namespace doris { + +const static std::string HEADER_JSON = "application/json"; + +void LoadStreamAction::handle(HttpRequest* req) { + req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str()); + HttpChannel::send_reply(req, HttpStatus::OK, _get_load_streams().ToString()); +} + +EasyJson LoadStreamAction::_get_load_streams() { + EasyJson response; + + auto load_streams = ExecEnv::GetInstance()->load_stream_mgr()->get_all_load_stream_ids(); + + response["msg"] = "OK"; + response["code"] = 0; + EasyJson data = response.Set("data", EasyJson::kObject); + data["host"] = BackendOptions::get_localhost(); + EasyJson tablets = data.Set("load_streams", EasyJson::kArray); + for (auto& load_id : load_streams) { + EasyJson tablet = tablets.PushBack(EasyJson::kObject); + tablet["load_id"] = load_id; + } + response["count"] = load_streams.size(); + return response; +} + +} // namespace doris diff --git a/be/src/http/action/load_stream_action.h b/be/src/http/action/load_stream_action.h new file mode 100644 index 00000000000..25689d3f88e --- /dev/null +++ b/be/src/http/action/load_stream_action.h @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <string> + +#include "http/http_handler.h" +#include "util/easy_json.h" + +namespace doris { +class HttpRequest; + +class ExecEnv; + +// Get BE load stream info from http API. +class LoadStreamAction final : public HttpHandler { +public: + LoadStreamAction() = default; + + ~LoadStreamAction() override = default; + + void handle(HttpRequest* req) override; + +private: + static EasyJson _get_load_streams(); +}; +} // namespace doris diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index 89d728a0086..c714db2d5e4 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -30,6 +30,7 @@ #include "olap/tablet_manager.h" #include "runtime/fragment_mgr.h" #include "runtime/frontend_info.h" +#include "runtime/load_stream_mgr.h" #include "util/debug_util.h" #include "util/time.h" #include "vec/sink/delta_writer_v2_pool.h" diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 2bba483249d..b5decae0afb 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -215,6 +215,7 @@ public: return _function_client_cache; } LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; } + LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr.get(); } std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr() { return _new_load_stream_mgr; } SmallFileMgr* small_file_mgr() { return _small_file_mgr; } doris::vectorized::SpillStreamManager* spill_stream_mgr() { return _spill_stream_mgr; } @@ -382,6 +383,7 @@ private: BfdParser* _bfd_parser = nullptr; BrokerMgr* _broker_mgr = nullptr; LoadChannelMgr* _load_channel_mgr = nullptr; + std::unique_ptr<LoadStreamMgr> _load_stream_mgr; // TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control its life cycle. std::shared_ptr<NewLoadStreamMgr> _new_load_stream_mgr; BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 50ef300412c..df4d60c789b 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -67,6 +67,7 @@ #include "runtime/heartbeat_flags.h" #include "runtime/load_channel_mgr.h" #include "runtime/load_path_mgr.h" +#include "runtime/load_stream_mgr.h" #include "runtime/memory/cache_manager.h" #include "runtime/memory/mem_tracker.h" #include "runtime/memory/mem_tracker_limiter.h" @@ -281,6 +282,10 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, _bfd_parser = BfdParser::create(); _broker_mgr = new BrokerMgr(this); _load_channel_mgr = new LoadChannelMgr(); + auto num_flush_threads = std::min( + _store_paths.size() * config::flush_thread_num_per_store, + static_cast<size_t>(CpuInfo::num_cores()) * config::max_flush_thread_num_per_cpu); + _load_stream_mgr = std::make_unique<LoadStreamMgr>(num_flush_threads); _new_load_stream_mgr = NewLoadStreamMgr::create_shared(); _internal_client_cache = new BrpcClientCache<PBackendService_Stub>(); _function_client_cache = new BrpcClientCache<PFunctionService_Stub>(); diff --git a/be/src/runtime/load_stream_mgr.cpp b/be/src/runtime/load_stream_mgr.cpp index 4eee9ba64a9..67739a0c0b0 100644 --- a/be/src/runtime/load_stream_mgr.cpp +++ b/be/src/runtime/load_stream_mgr.cpp @@ -32,18 +32,11 @@ namespace doris { -LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num, - FifoThreadPool* heavy_work_pool, FifoThreadPool* light_work_pool) - : _num_threads(segment_file_writer_thread_num), - _heavy_work_pool(heavy_work_pool), - _light_work_pool(light_work_pool) { - uint32_t num_cpu = std::thread::hardware_concurrency(); - uint32_t thread_num = num_cpu == 0 ? segment_file_writer_thread_num - : std::min(segment_file_writer_thread_num, - num_cpu * config::max_flush_thread_num_per_cpu); +LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num) + : _num_threads(segment_file_writer_thread_num) { static_cast<void>(ThreadPoolBuilder("SegmentFileWriterThreadPool") - .set_min_threads(thread_num) - .set_max_threads(thread_num) + .set_min_threads(segment_file_writer_thread_num) + .set_max_threads(segment_file_writer_thread_num) .build(&_file_writer_thread_pool)); } diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h index 9e875f3b829..45abd9c8470 100644 --- a/be/src/runtime/load_stream_mgr.h +++ b/be/src/runtime/load_stream_mgr.h @@ -34,8 +34,7 @@ class POpenStreamSinkRequest; class LoadStreamMgr { public: - LoadStreamMgr(uint32_t segment_file_writer_thread_num, FifoThreadPool* heavy_work_pool, - FifoThreadPool* light_work_pool); + LoadStreamMgr(uint32_t segment_file_writer_thread_num); ~LoadStreamMgr(); Status open_load_stream(const POpenLoadStreamRequest* request, LoadStream*& load_stream); @@ -47,12 +46,25 @@ public: } } + std::vector<std::string> get_all_load_stream_ids() { + std::vector<std::string> result; + std::lock_guard<std::mutex> lock(_lock); + + for (auto& [id, _] : _load_streams_map) { + result.push_back(id.to_string()); + } + return result; + } + // only used by ut size_t get_load_stream_num() { return _load_streams_map.size(); } FifoThreadPool* heavy_work_pool() { return _heavy_work_pool; } FifoThreadPool* light_work_pool() { return _light_work_pool; } + void set_heavy_work_pool(FifoThreadPool* pool) { _heavy_work_pool = pool; } + void set_light_work_pool(FifoThreadPool* pool) { _light_work_pool = pool; } + private: std::mutex _lock; std::unordered_map<UniqueId, LoadStreamPtr> _load_streams_map; diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index ba56d2cb857..72a0401a180 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -44,6 +44,7 @@ #include "http/action/health_action.h" #include "http/action/http_stream.h" #include "http/action/jeprofile_actions.h" +#include "http/action/load_stream_action.h" #include "http/action/meta_action.h" #include "http/action/metrics_action.h" #include "http/action/pad_rowset_action.h" @@ -168,6 +169,10 @@ Status HttpService::start() { _ev_http_server->register_handler(HttpMethod::GET, "/api/query_pipeline_tasks/{query_id}", query_pipeline_task_action); + // Register BE LoadStream action + LoadStreamAction* load_stream_action = _pool.add(new LoadStreamAction()); + _ev_http_server->register_handler(HttpMethod::GET, "/api/load_streams", load_stream_action); + // Register Tablets Info action TabletsInfoAction* tablets_info_action = _pool.add(new TabletsInfoAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 1cfa0ff0965..4c5b9c33de4 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -200,10 +200,7 @@ PInternalService::PInternalService(ExecEnv* exec_env) config::brpc_light_work_pool_max_queue_size != -1 ? config::brpc_light_work_pool_max_queue_size : std::max(10240, CpuInfo::num_cores() * 320), - "brpc_light"), - _load_stream_mgr(new LoadStreamMgr( - exec_env->store_paths().size() * config::flush_thread_num_per_store, - &_heavy_work_pool, &_light_work_pool)) { + "brpc_light") { REGISTER_HOOK_METRIC(heavy_work_pool_queue_size, [this]() { return _heavy_work_pool.get_queue_size(); }); REGISTER_HOOK_METRIC(light_work_pool_queue_size, @@ -222,6 +219,9 @@ PInternalService::PInternalService(ExecEnv* exec_env) REGISTER_HOOK_METRIC(light_work_max_threads, []() { return config::brpc_light_work_pool_threads; }); + _exec_env->load_stream_mgr()->set_heavy_work_pool(&_heavy_work_pool); + _exec_env->load_stream_mgr()->set_light_work_pool(&_light_work_pool); + CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter)); CHECK_EQ(0, bthread_key_create(&AsyncIO::btls_io_ctx_key, AsyncIO::io_ctx_key_deleter)); } @@ -378,7 +378,7 @@ void PInternalService::open_load_stream(google::protobuf::RpcController* control } LoadStream* load_stream = nullptr; - auto st = _load_stream_mgr->open_load_stream(request, load_stream); + auto st = _exec_env->load_stream_mgr()->open_load_stream(request, load_stream); if (!st.ok()) { st.to_protobuf(response->mutable_status()); return; diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index fdf3a183d96..4a1f84a6731 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -36,7 +36,6 @@ class StorageEngine; class ExecEnv; class PHandShakeRequest; class PHandShakeResponse; -class LoadStreamMgr; class RuntimeState; template <typename T> @@ -266,8 +265,6 @@ protected: // otherwise as light interface FifoThreadPool _heavy_work_pool; FifoThreadPool _light_work_pool; - - std::unique_ptr<LoadStreamMgr> _load_stream_mgr; }; // `StorageEngine` mixin for `PInternalService` diff --git a/be/test/runtime/load_stream_test.cpp b/be/test/runtime/load_stream_test.cpp index 7e18553e967..b9f22a4e751 100644 --- a/be/test/runtime/load_stream_test.cpp +++ b/be/test/runtime/load_stream_test.cpp @@ -603,7 +603,9 @@ public: EXPECT_TRUE(io::global_local_filesystem()->create_directory(zTestDir).ok()); - _load_stream_mgr = std::make_unique<LoadStreamMgr>(4, &_heavy_work_pool, &_light_work_pool); + _load_stream_mgr = std::make_unique<LoadStreamMgr>(4); + _load_stream_mgr->set_heavy_work_pool(&_heavy_work_pool); + _load_stream_mgr->set_light_work_pool(&_light_work_pool); _stream_service = new StreamService(_load_stream_mgr.get()); CHECK_EQ(0, _server->AddService(_stream_service, brpc::SERVER_OWNS_SERVICE)); brpc::ServerOptions server_options; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org