This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5af0d377399 [feature](api) add BE HTTP /api/load_streams (#36312)
5af0d377399 is described below

commit 5af0d3773993228b0197f4c936726f9c81ef93ba
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Sat Jun 15 10:11:36 2024 +0800

    [feature](api) add BE HTTP /api/load_streams (#36312)
    
    Add BE HTTP API `/api/load_streams` to get all load_id for existing
    load_streams.
    
    Example:
    
    ```console
    $ curl http://localhost:8040/api/load_streams
    
{"msg":"OK","code":0,"data":{"host":"10.16.10.7","load_streams":[]},"count":0}
    $ curl http://localhost:8040/api/load_streams
    
{"msg":"OK","code":0,"data":{"host":"10.16.10.7","load_streams":[{"load_id":"43fc5dd321f04407-96b7b2cd38323c3a"}]},"count":1}
    $ curl http://localhost:8040/api/load_streams
    
{"msg":"OK","code":0,"data":{"host":"10.16.10.7","load_streams":[{"load_id":"02b91d709808469b-8a43d591df446353"}]},"count":1}
    $ curl http://localhost:8040/api/load_streams
    
{"msg":"OK","code":0,"data":{"host":"10.16.10.7","load_streams":[]},"count":0}
    ```
    
    ```console
    $ curl -s http://localhost:8040/api/load_streams | jq
    {
      "msg": "OK",
      "code": 0,
      "data": {
        "host": "10.16.10.7",
        "load_streams": [
          {
            "load_id": "acdfeb53e3c34037-91fd829d1f2fe0de"
          },
          {
            "load_id": "e03742da4e5a4862-a7b0f36ad3b294c2"
          }
        ]
      },
      "count": 2
    }
    ```
---
 be/src/http/action/load_stream_action.cpp | 66 +++++++++++++++++++++++++++++++
 be/src/http/action/load_stream_action.h   | 42 ++++++++++++++++++++
 be/src/runtime/exec_env.cpp               |  1 +
 be/src/runtime/exec_env.h                 |  2 +
 be/src/runtime/exec_env_init.cpp          |  5 +++
 be/src/runtime/load_stream_mgr.cpp        | 15 ++-----
 be/src/runtime/load_stream_mgr.h          | 16 +++++++-
 be/src/service/http_service.cpp           |  5 +++
 be/src/service/internal_service.cpp       | 10 ++---
 be/src/service/internal_service.h         |  3 --
 be/test/runtime/load_stream_test.cpp      |  4 +-
 11 files changed, 147 insertions(+), 22 deletions(-)

diff --git a/be/src/http/action/load_stream_action.cpp 
b/be/src/http/action/load_stream_action.cpp
new file mode 100644
index 00000000000..087f3b0f927
--- /dev/null
+++ b/be/src/http/action/load_stream_action.cpp
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/load_stream_action.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdlib>
+#include <limits>
+#include <string>
+#include <vector>
+
+#include "cloud/config.h"
+#include "http/http_channel.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/http_status.h"
+#include "olap/olap_common.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_manager.h"
+#include "runtime/exec_env.h"
+#include "runtime/load_stream_mgr.h"
+#include "service/backend_options.h"
+
+namespace doris {
+
+const static std::string HEADER_JSON = "application/json";
+
+void LoadStreamAction::handle(HttpRequest* req) {
+    req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
+    HttpChannel::send_reply(req, HttpStatus::OK, 
_get_load_streams().ToString());
+}
+
+EasyJson LoadStreamAction::_get_load_streams() {
+    EasyJson response;
+
+    auto load_streams = 
ExecEnv::GetInstance()->load_stream_mgr()->get_all_load_stream_ids();
+
+    response["msg"] = "OK";
+    response["code"] = 0;
+    EasyJson data = response.Set("data", EasyJson::kObject);
+    data["host"] = BackendOptions::get_localhost();
+    EasyJson tablets = data.Set("load_streams", EasyJson::kArray);
+    for (auto& load_id : load_streams) {
+        EasyJson tablet = tablets.PushBack(EasyJson::kObject);
+        tablet["load_id"] = load_id;
+    }
+    response["count"] = load_streams.size();
+    return response;
+}
+
+} // namespace doris
diff --git a/be/src/http/action/load_stream_action.h 
b/be/src/http/action/load_stream_action.h
new file mode 100644
index 00000000000..25689d3f88e
--- /dev/null
+++ b/be/src/http/action/load_stream_action.h
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "http/http_handler.h"
+#include "util/easy_json.h"
+
+namespace doris {
+class HttpRequest;
+
+class ExecEnv;
+
+// Get BE load stream info from http API.
+class LoadStreamAction final : public HttpHandler {
+public:
+    LoadStreamAction() = default;
+
+    ~LoadStreamAction() override = default;
+
+    void handle(HttpRequest* req) override;
+
+private:
+    static EasyJson _get_load_streams();
+};
+} // namespace doris
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 89d728a0086..c714db2d5e4 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -30,6 +30,7 @@
 #include "olap/tablet_manager.h"
 #include "runtime/fragment_mgr.h"
 #include "runtime/frontend_info.h"
+#include "runtime/load_stream_mgr.h"
 #include "util/debug_util.h"
 #include "util/time.h"
 #include "vec/sink/delta_writer_v2_pool.h"
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 2bba483249d..b5decae0afb 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -215,6 +215,7 @@ public:
         return _function_client_cache;
     }
     LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; }
+    LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr.get(); }
     std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr() { return 
_new_load_stream_mgr; }
     SmallFileMgr* small_file_mgr() { return _small_file_mgr; }
     doris::vectorized::SpillStreamManager* spill_stream_mgr() { return 
_spill_stream_mgr; }
@@ -382,6 +383,7 @@ private:
     BfdParser* _bfd_parser = nullptr;
     BrokerMgr* _broker_mgr = nullptr;
     LoadChannelMgr* _load_channel_mgr = nullptr;
+    std::unique_ptr<LoadStreamMgr> _load_stream_mgr;
     // TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control 
its life cycle.
     std::shared_ptr<NewLoadStreamMgr> _new_load_stream_mgr;
     BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 50ef300412c..df4d60c789b 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -67,6 +67,7 @@
 #include "runtime/heartbeat_flags.h"
 #include "runtime/load_channel_mgr.h"
 #include "runtime/load_path_mgr.h"
+#include "runtime/load_stream_mgr.h"
 #include "runtime/memory/cache_manager.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/memory/mem_tracker_limiter.h"
@@ -281,6 +282,10 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     _bfd_parser = BfdParser::create();
     _broker_mgr = new BrokerMgr(this);
     _load_channel_mgr = new LoadChannelMgr();
+    auto num_flush_threads = std::min(
+            _store_paths.size() * config::flush_thread_num_per_store,
+            static_cast<size_t>(CpuInfo::num_cores()) * 
config::max_flush_thread_num_per_cpu);
+    _load_stream_mgr = std::make_unique<LoadStreamMgr>(num_flush_threads);
     _new_load_stream_mgr = NewLoadStreamMgr::create_shared();
     _internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
     _function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
diff --git a/be/src/runtime/load_stream_mgr.cpp 
b/be/src/runtime/load_stream_mgr.cpp
index 4eee9ba64a9..67739a0c0b0 100644
--- a/be/src/runtime/load_stream_mgr.cpp
+++ b/be/src/runtime/load_stream_mgr.cpp
@@ -32,18 +32,11 @@
 
 namespace doris {
 
-LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num,
-                             FifoThreadPool* heavy_work_pool, FifoThreadPool* 
light_work_pool)
-        : _num_threads(segment_file_writer_thread_num),
-          _heavy_work_pool(heavy_work_pool),
-          _light_work_pool(light_work_pool) {
-    uint32_t num_cpu = std::thread::hardware_concurrency();
-    uint32_t thread_num = num_cpu == 0 ? segment_file_writer_thread_num
-                                       : 
std::min(segment_file_writer_thread_num,
-                                                  num_cpu * 
config::max_flush_thread_num_per_cpu);
+LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num)
+        : _num_threads(segment_file_writer_thread_num) {
     static_cast<void>(ThreadPoolBuilder("SegmentFileWriterThreadPool")
-                              .set_min_threads(thread_num)
-                              .set_max_threads(thread_num)
+                              .set_min_threads(segment_file_writer_thread_num)
+                              .set_max_threads(segment_file_writer_thread_num)
                               .build(&_file_writer_thread_pool));
 }
 
diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h
index 9e875f3b829..45abd9c8470 100644
--- a/be/src/runtime/load_stream_mgr.h
+++ b/be/src/runtime/load_stream_mgr.h
@@ -34,8 +34,7 @@ class POpenStreamSinkRequest;
 
 class LoadStreamMgr {
 public:
-    LoadStreamMgr(uint32_t segment_file_writer_thread_num, FifoThreadPool* 
heavy_work_pool,
-                  FifoThreadPool* light_work_pool);
+    LoadStreamMgr(uint32_t segment_file_writer_thread_num);
     ~LoadStreamMgr();
 
     Status open_load_stream(const POpenLoadStreamRequest* request, 
LoadStream*& load_stream);
@@ -47,12 +46,25 @@ public:
         }
     }
 
+    std::vector<std::string> get_all_load_stream_ids() {
+        std::vector<std::string> result;
+        std::lock_guard<std::mutex> lock(_lock);
+
+        for (auto& [id, _] : _load_streams_map) {
+            result.push_back(id.to_string());
+        }
+        return result;
+    }
+
     // only used by ut
     size_t get_load_stream_num() { return _load_streams_map.size(); }
 
     FifoThreadPool* heavy_work_pool() { return _heavy_work_pool; }
     FifoThreadPool* light_work_pool() { return _light_work_pool; }
 
+    void set_heavy_work_pool(FifoThreadPool* pool) { _heavy_work_pool = pool; }
+    void set_light_work_pool(FifoThreadPool* pool) { _light_work_pool = pool; }
+
 private:
     std::mutex _lock;
     std::unordered_map<UniqueId, LoadStreamPtr> _load_streams_map;
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index ba56d2cb857..72a0401a180 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -44,6 +44,7 @@
 #include "http/action/health_action.h"
 #include "http/action/http_stream.h"
 #include "http/action/jeprofile_actions.h"
+#include "http/action/load_stream_action.h"
 #include "http/action/meta_action.h"
 #include "http/action/metrics_action.h"
 #include "http/action/pad_rowset_action.h"
@@ -168,6 +169,10 @@ Status HttpService::start() {
     _ev_http_server->register_handler(HttpMethod::GET, 
"/api/query_pipeline_tasks/{query_id}",
                                       query_pipeline_task_action);
 
+    // Register BE LoadStream action
+    LoadStreamAction* load_stream_action = _pool.add(new LoadStreamAction());
+    _ev_http_server->register_handler(HttpMethod::GET, "/api/load_streams", 
load_stream_action);
+
     // Register Tablets Info action
     TabletsInfoAction* tablets_info_action =
             _pool.add(new TabletsInfoAction(_env, TPrivilegeHier::GLOBAL, 
TPrivilegeType::ADMIN));
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 1cfa0ff0965..4c5b9c33de4 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -200,10 +200,7 @@ PInternalService::PInternalService(ExecEnv* exec_env)
                            config::brpc_light_work_pool_max_queue_size != -1
                                    ? 
config::brpc_light_work_pool_max_queue_size
                                    : std::max(10240, CpuInfo::num_cores() * 
320),
-                           "brpc_light"),
-          _load_stream_mgr(new LoadStreamMgr(
-                  exec_env->store_paths().size() * 
config::flush_thread_num_per_store,
-                  &_heavy_work_pool, &_light_work_pool)) {
+                           "brpc_light") {
     REGISTER_HOOK_METRIC(heavy_work_pool_queue_size,
                          [this]() { return _heavy_work_pool.get_queue_size(); 
});
     REGISTER_HOOK_METRIC(light_work_pool_queue_size,
@@ -222,6 +219,9 @@ PInternalService::PInternalService(ExecEnv* exec_env)
     REGISTER_HOOK_METRIC(light_work_max_threads,
                          []() { return config::brpc_light_work_pool_threads; 
});
 
+    _exec_env->load_stream_mgr()->set_heavy_work_pool(&_heavy_work_pool);
+    _exec_env->load_stream_mgr()->set_light_work_pool(&_light_work_pool);
+
     CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter));
     CHECK_EQ(0, bthread_key_create(&AsyncIO::btls_io_ctx_key, 
AsyncIO::io_ctx_key_deleter));
 }
@@ -378,7 +378,7 @@ void 
PInternalService::open_load_stream(google::protobuf::RpcController* control
         }
 
         LoadStream* load_stream = nullptr;
-        auto st = _load_stream_mgr->open_load_stream(request, load_stream);
+        auto st = _exec_env->load_stream_mgr()->open_load_stream(request, 
load_stream);
         if (!st.ok()) {
             st.to_protobuf(response->mutable_status());
             return;
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index fdf3a183d96..4a1f84a6731 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -36,7 +36,6 @@ class StorageEngine;
 class ExecEnv;
 class PHandShakeRequest;
 class PHandShakeResponse;
-class LoadStreamMgr;
 class RuntimeState;
 
 template <typename T>
@@ -266,8 +265,6 @@ protected:
     // otherwise as light interface
     FifoThreadPool _heavy_work_pool;
     FifoThreadPool _light_work_pool;
-
-    std::unique_ptr<LoadStreamMgr> _load_stream_mgr;
 };
 
 // `StorageEngine` mixin for `PInternalService`
diff --git a/be/test/runtime/load_stream_test.cpp 
b/be/test/runtime/load_stream_test.cpp
index 7e18553e967..b9f22a4e751 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -603,7 +603,9 @@ public:
 
         
EXPECT_TRUE(io::global_local_filesystem()->create_directory(zTestDir).ok());
 
-        _load_stream_mgr = std::make_unique<LoadStreamMgr>(4, 
&_heavy_work_pool, &_light_work_pool);
+        _load_stream_mgr = std::make_unique<LoadStreamMgr>(4);
+        _load_stream_mgr->set_heavy_work_pool(&_heavy_work_pool);
+        _load_stream_mgr->set_light_work_pool(&_light_work_pool);
         _stream_service = new StreamService(_load_stream_mgr.get());
         CHECK_EQ(0, _server->AddService(_stream_service, 
brpc::SERVER_OWNS_SERVICE));
         brpc::ServerOptions server_options;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to