This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 984ffa00fb4 [feature](merge-cloud) Add cloud meta manager skeleton (#29833) 984ffa00fb4 is described below commit 984ffa00fb41615e77f34c1239a237b361276560 Author: walter <w41te...@gmail.com> AuthorDate: Thu Jan 11 17:52:17 2024 +0800 [feature](merge-cloud) Add cloud meta manager skeleton (#29833) Co-authored-by: plat1ko <platonekos...@gmail.com> Co-authored-by: Gavin Chou <gavineaglec...@gmail.com> Co-authored-by: Xin Liao <liaoxin...@126.com> Co-authored-by: Xiaocc <598887...@qq.com> Co-authored-by: deardeng <565620...@qq.com> Co-authored-by: Lei Zhang <27994433+swjtu-zhang...@users.noreply.github.com> Co-authored-by: Lightman <31928846+lchangli...@users.noreply.github.com> Co-authored-by: Luwei <814383...@qq.com> Co-authored-by: Yongqiang YANG <dataroar...@gmail.com> Co-authored-by: YueW <45946325+tany...@users.noreply.github.com> Co-authored-by: bobhan1 <bh2444151...@outlook.com> --- be/src/cloud/cloud_meta_mgr.cpp | 287 ++++++++++++++++++++++++++++++++++++++++ be/src/cloud/cloud_meta_mgr.h | 76 +++++++++++ be/src/cloud/config.cpp | 9 ++ be/src/cloud/config.h | 14 ++ be/src/cloud/meta_mgr.h | 86 ++++++++++++ be/src/util/network_util.cpp | 39 ++++++ be/src/util/network_util.h | 2 + 7 files changed, 513 insertions(+) diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp new file mode 100644 index 00000000000..f2fcf5132f6 --- /dev/null +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -0,0 +1,287 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "cloud/cloud_meta_mgr.h" + +#include <brpc/channel.h> +#include <brpc/controller.h> +#include <glog/logging.h> + +#include <atomic> +#include <chrono> +#include <memory> +#include <mutex> +#include <random> +#include <shared_mutex> +#include <vector> + +#include "cloud/config.h" +#include "common/logging.h" +#include "common/status.h" +#include "common/sync_point.h" +#include "gen_cpp/cloud.pb.h" +#include "gen_cpp/olap_file.pb.h" +#include "olap/olap_common.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/tablet.h" +#include "olap/tablet_meta.h" +#include "runtime/stream_load/stream_load_context.h" +#include "util/network_util.h" +#include "util/s3_util.h" + +namespace doris::cloud { +using namespace ErrorCode; + +bvar::LatencyRecorder g_get_rowset_latency("doris_CloudMetaMgr", "get_rowset"); + +class MetaServiceProxy { +public: + static Status get_client(std::shared_ptr<MetaService_Stub>* stub) { + SYNC_POINT_RETURN_WITH_VALUE("MetaServiceProxy::get_client", Status::OK(), stub); + return get_pooled_client(stub); + } + +private: + static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub) { + static std::once_flag proxies_flag; + static size_t num_proxies = 1; + static std::atomic<size_t> index(0); + static std::unique_ptr<MetaServiceProxy[]> proxies; + + std::call_once( + proxies_flag, +[]() { + if (config::meta_service_connection_pooled) { + num_proxies = config::meta_service_connection_pool_size; + } + proxies = std::make_unique<MetaServiceProxy[]>(num_proxies); + }); + + for (size_t i = 0; i + 1 < num_proxies; ++i) { + size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies; + Status s = proxies[next_index].get(stub); + if (s.ok()) return Status::OK(); + } + + size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies; + return proxies[next_index].get(stub); + } + + static Status init_channel(brpc::Channel* channel) { + static std::atomic<size_t> index = 1; + + std::string ip; + uint16_t port; + Status s = get_meta_service_ip_and_port(&ip, &port); + if (!s.ok()) { + LOG(WARNING) << "fail to get meta service ip and port: " << s; + return s; + } + + size_t next_id = index.fetch_add(1, std::memory_order_relaxed); + brpc::ChannelOptions options; + options.connection_group = fmt::format("ms_{}", next_id); + if (channel->Init(ip.c_str(), port, &options) != 0) { + return Status::InternalError("fail to init brpc channel, ip: {}, port: {}", ip, port); + } + return Status::OK(); + } + + static Status get_meta_service_ip_and_port(std::string* ip, uint16_t* port) { + std::string parsed_host; + if (!parse_endpoint(config::meta_service_endpoint, &parsed_host, port)) { + return Status::InvalidArgument("invalid meta service endpoint: {}", + config::meta_service_endpoint); + } + if (is_valid_ip(parsed_host)) { + *ip = std::move(parsed_host); + return Status::OK(); + } + return hostname_to_ip(parsed_host, *ip); + } + + bool is_idle_timeout(long now) { + auto idle_timeout_ms = config::meta_service_idle_connection_timeout_ms; + return idle_timeout_ms > 0 && + _last_access_at_ms.load(std::memory_order_relaxed) + idle_timeout_ms < now; + } + + Status get(std::shared_ptr<MetaService_Stub>* stub) { + using namespace std::chrono; + + auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); + { + std::shared_lock lock(_mutex); + if (_deadline_ms >= now && !is_idle_timeout(now)) { + _last_access_at_ms.store(now, std::memory_order_relaxed); + *stub = _stub; + return Status::OK(); + } + } + + auto channel = std::make_unique<brpc::Channel>(); + Status s = init_channel(channel.get()); + if (UNLIKELY(!s.ok())) { + return s; + } + + *stub = std::make_shared<MetaService_Stub>(channel.release(), + google::protobuf::Service::STUB_OWNS_CHANNEL); + + long deadline = now; + if (config::meta_service_connection_age_base_minutes > 0) { + std::default_random_engine rng(static_cast<uint32_t>(now)); + std::uniform_int_distribution<> uni( + config::meta_service_connection_age_base_minutes, + config::meta_service_connection_age_base_minutes * 2); + deadline = now + duration_cast<milliseconds>(minutes(uni(rng))).count(); + } else { + deadline = LONG_MAX; + } + + // Last one WIN + std::unique_lock lock(_mutex); + _last_access_at_ms.store(now, std::memory_order_relaxed); + _deadline_ms = deadline; + _stub = *stub; + return Status::OK(); + } + + std::shared_mutex _mutex; + std::atomic<long> _last_access_at_ms {0}; + long _deadline_ms {0}; + std::shared_ptr<MetaService_Stub> _stub; +}; + +Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta) { + VLOG_DEBUG << "send GetTabletRequest, tablet_id: " << tablet_id; + TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::get_tablet_meta", Status::OK(), tablet_id, + tablet_meta); + + std::shared_ptr<MetaService_Stub> stub; + RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub)); + + int tried = 0; + while (true) { + brpc::Controller cntl; + cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); + GetTabletRequest req; + GetTabletResponse resp; + req.set_cloud_unique_id(config::cloud_unique_id); + req.set_tablet_id(tablet_id); + stub->get_tablet(&cntl, &req, &resp, nullptr); + int retry_times = config::meta_service_rpc_retry_times; + if (cntl.Failed()) { + if (tried++ < retry_times) { + auto rng = std::default_random_engine(static_cast<uint32_t>( + std::chrono::steady_clock::now().time_since_epoch().count())); + std::uniform_int_distribution<uint32_t> u(20, 200); + std::uniform_int_distribution<uint32_t> u1(500, 1000); + uint32_t duration_ms = tried >= 100 ? u(rng) : u1(rng); + std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms)); + LOG_INFO("failed to get tablet meta") + .tag("reason", cntl.ErrorText()) + .tag("tablet_id", tablet_id) + .tag("tried", tried) + .tag("sleep", duration_ms); + continue; + } + return Status::RpcError("failed to get tablet meta: {}", cntl.ErrorText()); + } + if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) { + return Status::NotFound("failed to get tablet meta: {}", resp.status().msg()); + } + if (resp.status().code() != MetaServiceCode::OK) { + return Status::InternalError("failed to get tablet meta: {}", resp.status().msg()); + } + *tablet_meta = std::make_shared<TabletMeta>(); + (*tablet_meta)->init_from_pb(resp.tablet_meta()); + VLOG_DEBUG << "get tablet meta, tablet_id: " << (*tablet_meta)->tablet_id(); + return Status::OK(); + } +} + +Status CloudMetaMgr::sync_tablet_rowsets(Tablet* tablet, bool warmup_delta_data) { + return Status::NotSupported("CloudMetaMgr::sync_tablet_rowsets is not implemented"); +} + +Status CloudMetaMgr::sync_tablet_delete_bitmap( + Tablet* tablet, int64_t old_max_version, + const google::protobuf::RepeatedPtrField<RowsetMetaPB>& rs_metas, + const TabletStatsPB& stats, const TabletIndexPB& idx, DeleteBitmap* delete_bitmap) { + return Status::NotSupported("CloudMetaMgr::sync_tablet_delete_bitmap is not implemented"); +} + +Status CloudMetaMgr::prepare_rowset(const RowsetMeta* rs_meta, bool is_tmp, + RowsetMetaSharedPtr* existed_rs_meta) { + return Status::NotSupported("CloudMetaMgr::prepare_rowset is not implemented"); +} + +Status CloudMetaMgr::commit_rowset(const RowsetMeta* rs_meta, bool is_tmp, + RowsetMetaSharedPtr* existed_rs_meta) { + return Status::NotSupported("CloudMetaMgr::commit_rowset is not implemented"); +} + +Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta) { + return Status::NotSupported("CloudMetaMgr::update_tmp_rowset is not implemented"); +} + +Status CloudMetaMgr::commit_txn(StreamLoadContext* ctx, bool is_2pc) { + return Status::NotSupported("CloudMetaMgr::commit_txn is not implemented"); +} + +Status CloudMetaMgr::abort_txn(StreamLoadContext* ctx) { + return Status::NotSupported("CloudMetaMgr::abort_txn is not implemented"); +} + +Status CloudMetaMgr::precommit_txn(StreamLoadContext* ctx) { + return Status::NotSupported("CloudMetaMgr::precommit_txn is not implemented"); +} + +Status CloudMetaMgr::get_s3_info(std::vector<std::tuple<std::string, S3Conf>>* s3_infos) { + return Status::NotSupported("CloudMetaMgr::get_s3_info is not implemented"); +} + +Status CloudMetaMgr::prepare_tablet_job(const TabletJobInfoPB& job, StartTabletJobResponse* res) { + return Status::NotSupported("CloudMetaMgr::prepare_tablet_job is not implemented"); +} + +Status CloudMetaMgr::commit_tablet_job(const TabletJobInfoPB& job, FinishTabletJobResponse* res) { + return Status::NotSupported("CloudMetaMgr::commit_tablet_job is not implemented"); +} + +Status CloudMetaMgr::abort_tablet_job(const TabletJobInfoPB& job) { + return Status::NotSupported("CloudMetaMgr::alter_tablet_job is not implemented"); +} + +Status CloudMetaMgr::lease_tablet_job(const TabletJobInfoPB& job) { + return Status::NotSupported("CloudMetaMgr::lease_tablet_job is not implemented"); +} + +Status CloudMetaMgr::update_tablet_schema(int64_t tablet_id, const TabletSchema* tablet_schema) { + return Status::NotSupported("CloudMetaMgr::update_tablet_schema is not implemented"); +} + +Status CloudMetaMgr::update_delete_bitmap(const Tablet* tablet, int64_t lock_id, int64_t initiator, + DeleteBitmap* delete_bitmap) { + return Status::NotSupported("CloudMetaMgr::update_delete_bitmap is not implemented"); +} + +Status CloudMetaMgr::get_delete_bitmap_update_lock(const Tablet* tablet, int64_t lock_id, + int64_t initiator) { + return Status::NotSupported("CloudMetaMgr::get_delete_bitmap_update_lock is not implemented"); +} + +} // namespace doris::cloud diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h new file mode 100644 index 00000000000..fe65e0441ff --- /dev/null +++ b/be/src/cloud/cloud_meta_mgr.h @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "cloud/meta_mgr.h" +#include "olap/rowset/rowset_meta.h" + +namespace doris::cloud { +class TabletStatsPB; +class TabletIndexPB; + +class CloudMetaMgr final : public MetaMgr { +public: + CloudMetaMgr() = default; + ~CloudMetaMgr() override = default; + CloudMetaMgr(const CloudMetaMgr&) = delete; + CloudMetaMgr& operator=(const CloudMetaMgr&) = delete; + + Status get_tablet_meta(int64_t tablet_id, std::shared_ptr<TabletMeta>* tablet_meta) override; + + Status sync_tablet_rowsets(Tablet* tablet, bool warmup_delta_data = false) override; + + Status prepare_rowset(const RowsetMeta* rs_meta, bool is_tmp, + std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr) override; + + Status commit_rowset(const RowsetMeta* rs_meta, bool is_tmp, + std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr) override; + + Status update_tmp_rowset(const RowsetMeta& rs_meta) override; + + Status commit_txn(StreamLoadContext* ctx, bool is_2pc) override; + + Status abort_txn(StreamLoadContext* ctx) override; + + Status precommit_txn(StreamLoadContext* ctx) override; + + Status get_s3_info(std::vector<std::tuple<std::string, S3Conf>>* s3_infos) override; + + Status prepare_tablet_job(const TabletJobInfoPB& job, StartTabletJobResponse* res) override; + + Status commit_tablet_job(const TabletJobInfoPB& job, FinishTabletJobResponse* res) override; + + Status abort_tablet_job(const TabletJobInfoPB& job) override; + + Status lease_tablet_job(const TabletJobInfoPB& job) override; + + Status update_tablet_schema(int64_t tablet_id, const TabletSchema* tablet_schema) override; + + Status update_delete_bitmap(const Tablet* tablet, int64_t lock_id, int64_t initiator, + DeleteBitmap* delete_bitmap) override; + + Status get_delete_bitmap_update_lock(const Tablet* tablet, int64_t lock_id, + int64_t initiator) override; + +private: + Status sync_tablet_delete_bitmap( + Tablet* tablet, int64_t old_max_version, + const google::protobuf::RepeatedPtrField<RowsetMetaPB>& rs_metas, + const TabletStatsPB& stas, const TabletIndexPB& idx, DeleteBitmap* delete_bitmap); +}; + +} // namespace doris::cloud diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index 12a217dfcd0..d7513574037 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -20,5 +20,14 @@ namespace doris::config { DEFINE_String(cloud_unique_id, ""); +DEFINE_String(meta_service_endpoint, ""); +DEFINE_Bool(meta_service_use_load_balancer, "false"); +DEFINE_mInt32(meta_service_rpc_timeout_ms, "10000"); +DEFINE_Bool(meta_service_connection_pooled, "true"); +DEFINE_mInt64(meta_service_connection_pool_size, "20"); +DEFINE_mInt32(meta_service_connection_age_base_minutes, "5"); +DEFINE_mInt32(meta_service_idle_connection_timeout_ms, "0"); +DEFINE_mInt32(meta_service_rpc_retry_times, "200"); +DEFINE_mInt32(meta_service_brpc_timeout_ms, "10000"); } // namespace doris::config diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index 0044ab11458..0a2ceab3e5a 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -27,4 +27,18 @@ static inline bool is_cloud_mode() { return !cloud_unique_id.empty(); } +DECLARE_String(meta_service_endpoint); +// Set the underlying connection type to pooled. +DECLARE_Bool(meta_service_connection_pooled); +DECLARE_mInt64(meta_service_connection_pool_size); +// A connection will expire after a random time during [base, 2*base], so that the BE +// has a chance to connect to a new RS. Set zero to disable it. +DECLARE_mInt32(meta_service_connection_age_base_minutes); +// Rebuild the idle connections after the timeout exceeds. Set zero to disable it. +DECLARE_mInt32(meta_service_idle_connection_timeout_ms); +DECLARE_mInt32(meta_service_rpc_timeout_ms); +DECLARE_mInt32(meta_service_rpc_retry_times); +// default brpc timeout +DECLARE_mInt32(meta_service_brpc_timeout_ms); + } // namespace doris::config diff --git a/be/src/cloud/meta_mgr.h b/be/src/cloud/meta_mgr.h new file mode 100644 index 00000000000..c573d43ff76 --- /dev/null +++ b/be/src/cloud/meta_mgr.h @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <memory> +#include <string> +#include <tuple> +#include <vector> + +#include "common/status.h" +#include "util/s3_util.h" + +namespace doris { +class StreamLoadContext; +class Tablet; +class TabletMeta; +class RowsetMeta; +class TabletSchema; +class DeleteBitmap; + +namespace cloud { + +class TabletJobInfoPB; +class StartTabletJobResponse; +class FinishTabletJobResponse; + +class MetaMgr { +public: + virtual ~MetaMgr() = default; + + virtual Status open() { return Status::OK(); } + + virtual Status get_tablet_meta(int64_t tablet_id, std::shared_ptr<TabletMeta>* tablet_meta) = 0; + + // If `warmup_delta_data` is true, download the new version rowset data in background + virtual Status sync_tablet_rowsets(Tablet* tablet, bool warmup_delta_data = false) = 0; + + virtual Status prepare_rowset(const RowsetMeta* rs_meta, bool is_tmp, + std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr) = 0; + + virtual Status commit_rowset(const RowsetMeta* rs_meta, bool is_tmp, + std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr) = 0; + + virtual Status update_tmp_rowset(const RowsetMeta& rs_meta) = 0; + + virtual Status commit_txn(StreamLoadContext* ctx, bool is_2pc) = 0; + + virtual Status abort_txn(StreamLoadContext* ctx) = 0; + + virtual Status precommit_txn(StreamLoadContext* ctx) = 0; + + virtual Status get_s3_info(std::vector<std::tuple<std::string, S3Conf>>* s3_infos) = 0; + + virtual Status prepare_tablet_job(const TabletJobInfoPB& job, StartTabletJobResponse* res) = 0; + + virtual Status commit_tablet_job(const TabletJobInfoPB& job, FinishTabletJobResponse* res) = 0; + + virtual Status abort_tablet_job(const TabletJobInfoPB& job) = 0; + + virtual Status lease_tablet_job(const TabletJobInfoPB& job) = 0; + + virtual Status update_delete_bitmap(const Tablet* tablet, int64_t lock_id, int64_t initiator, + DeleteBitmap* delete_bitmap) = 0; + + virtual Status get_delete_bitmap_update_lock(const Tablet* tablet, int64_t lock_id, + int64_t initiator) = 0; + + virtual Status update_tablet_schema(int64_t tablet_id, const TabletSchema* tablet_schema) = 0; +}; + +} // namespace cloud +} // namespace doris diff --git a/be/src/util/network_util.cpp b/be/src/util/network_util.cpp index e7953c341d4..3d93c2e183f 100644 --- a/be/src/util/network_util.cpp +++ b/be/src/util/network_util.cpp @@ -78,6 +78,45 @@ bool is_valid_ip(const std::string& ip) { return (inet_pton(AF_INET6, ip.data(), buf) > 0) || (inet_pton(AF_INET, ip.data(), buf) > 0); } +bool parse_endpoint(const std::string& endpoint, std::string* host, uint16_t* port) { + auto p = endpoint.find_last_of(':'); + if (p == std::string::npos || p + 1 == endpoint.size()) { + return false; + } + + const char* port_base = endpoint.c_str() + p + 1; + char* end = nullptr; + long value = strtol(port_base, &end, 10); + if (port_base == end) { + return false; + } else if (*end) { + while (std::isspace(*end)) { + end++; + } + if (*end) { + return false; + } + } else if (value < 0 || 65535 < value) { + return false; + } + + std::string::size_type i = 0; + const char* host_base = endpoint.c_str(); + while (std::isspace(host_base[i])) { + i++; + } + if (i < p && host_base[i] == '[' && host_base[p - 1] == ']') { + i += 1; + p -= 1; + } + if (i >= p) { + return false; + } + *host = endpoint.substr(i, p - i); + *port = value; + return true; +} + Status hostname_to_ip(const std::string& host, std::string& ip) { auto start = std::chrono::high_resolution_clock::now(); Status status = hostname_to_ipv4(host, ip); diff --git a/be/src/util/network_util.h b/be/src/util/network_util.h index a9541586ef5..fe8864bd1bd 100644 --- a/be/src/util/network_util.h +++ b/be/src/util/network_util.h @@ -45,6 +45,8 @@ private: bool is_valid_ip(const std::string& ip); +bool parse_endpoint(const std::string& endpoint, std::string* host, uint16_t* port); + Status hostname_to_ip(const std::string& host, std::string& ip); Status hostname_to_ipv4(const std::string& host, std::string& ip); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org