dataroaring commented on code in PR #17881: URL: https://github.com/apache/doris/pull/17881#discussion_r1193801045
########## be/src/http/action/download_binlog_action.cpp: ########## @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http/action/download_binlog_action.h" + +#include <fmt/format.h> +#include <fmt/ranges.h> + +#include <cstdint> +#include <stdexcept> +#include <string_view> +#include <vector> + +#include "common/config.h" +#include "common/logging.h" +#include "http/http_channel.h" +#include "http/http_request.h" +#include "http/utils.h" +#include "io/fs/local_file_system.h" +#include "olap/storage_engine.h" +#include "olap/tablet.h" +#include "olap/tablet_manager.h" +#include "runtime/exec_env.h" + +namespace doris { + +namespace { +const std::string kMethodParameter = "method"; +const std::string kTokenParameter = "token"; +const std::string kTabletIdParameter = "tablet_id"; +const std::string kBinlogVersionParameter = "binlog_version"; +const std::string kRowsetIdParameter = "rowset_id"; +const std::string kSegmentIndexParameter = "segment_index"; + +// get http param, if no value throw exception +const auto& get_http_param(HttpRequest* req, const std::string& param_name) { + const auto& param = req->param(param_name); + if (param.empty()) { + auto error_msg = fmt::format("parameter {} not specified in url.", param_name); + throw std::runtime_error(error_msg); + } + return param; +} + +auto get_tablet(const std::string& tablet_id_str) { + int64_t tablet_id = std::atoll(tablet_id_str.data()); + + TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id); + if (tablet == nullptr) { + auto error = fmt::format("tablet is not exist, tablet_id={}", tablet_id); + LOG(WARNING) << error; + throw std::runtime_error(error); + } + + return tablet; +} + +// need binlog_version, tablet_id +void handle_get_binlog_info(HttpRequest* req) { + try { + const auto& binlog_version = get_http_param(req, kBinlogVersionParameter); + const auto& tablet_id = get_http_param(req, kTabletIdParameter); + auto tablet = get_tablet(tablet_id); + + const auto& [rowset_id, num_segments] = tablet->get_binlog_info(binlog_version); + auto binlog_info_msg = fmt::format("{}:{}", rowset_id, num_segments); Review Comment: We should return a json message? ########## be/src/http/action/monitor_action.cpp: ########## @@ -1,60 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "http/action/monitor_action.h" - -#include <glog/logging.h> - -#include <sstream> -#include <utility> - -#include "http/http_channel.h" -#include "http/http_request.h" -#include "http/http_status.h" -#include "http/rest_monitor_iface.h" - -namespace doris { - -const std::string MODULE_KEY = "module"; - -MonitorAction::MonitorAction() {} - -void MonitorAction::register_module(const std::string& name, RestMonitorIface* module) { - _module_by_name.insert(std::make_pair(name, module)); -} - -void MonitorAction::handle(HttpRequest* req) { - LOG(INFO) << req->debug_string(); - const std::string& module = req->param(MODULE_KEY); - if (module.empty()) { - std::string err_msg = "No module params\n"; - HttpChannel::send_reply(req, HttpStatus::OK, err_msg); - return; - } - if (_module_by_name.find(module) == _module_by_name.end()) { - std::string err_msg = "Unknown module("; - err_msg += module + ")\n"; - HttpChannel::send_reply(req, HttpStatus::OK, err_msg); - return; - } - std::stringstream ss; - _module_by_name[module]->debug(ss); - std::string str = ss.str(); - HttpChannel::send_reply(req, HttpStatus::OK, str); -} - -} // namespace doris Review Comment: The action is useless? ########## gensrc/thrift/FrontendService.thrift: ########## @@ -523,6 +523,33 @@ struct TLoadTxnBeginResult { 4: optional i64 db_id } +struct Coordinator { + 1: required string host + 2: required i32 port +} Review Comment: duplicated with TNetworkAddress. ########## be/src/olap/storage_engine.cpp: ########## @@ -757,6 +769,137 @@ void StorageEngine::_clean_unused_rowset_metas() { } } +void StorageEngine::_gc_binlogs() { + auto data_dirs = get_stores(); + struct tablet_info { + std::string tablet_path; + int64_t binlog_ttl_ms; + }; + std::unordered_map<int64_t, tablet_info> tablets_info; + + auto get_tablet_info = [&tablets_info, this](int64_t tablet_id) -> const tablet_info& { + if (auto iter = tablets_info.find(tablet_id); iter != tablets_info.end()) { + return iter->second; + } + + auto tablet = tablet_manager()->get_tablet(tablet_id); + if (tablet == nullptr) { + LOG(WARNING) << "failed to find tablet " << tablet_id; + static tablet_info empty_tablet_info; + return empty_tablet_info; + } + + auto tablet_path = tablet->tablet_path(); + auto binlog_ttl_ms = tablet->binlog_ttl_ms(); + tablets_info.emplace(tablet_id, tablet_info {tablet_path, binlog_ttl_ms}); + return tablets_info[tablet_id]; + }; + + for (auto data_dir : data_dirs) { + std::string prefix_key {kBinlogMetaPrefix}; + OlapMeta* meta = data_dir->get_meta(); + DCHECK(meta != nullptr); + + auto now = now_ms(); + int64_t last_tablet_id = 0; + std::vector<std::string> wait_for_deleted_binlog_keys; + std::vector<std::string> wait_for_deleted_binlog_files; + auto add_to_wait_for_deleted_binlog_keys = + [&wait_for_deleted_binlog_keys](std::string_view key) { + wait_for_deleted_binlog_keys.emplace_back(key); + wait_for_deleted_binlog_keys.push_back(get_binlog_data_key_from_meta_key(key)); + }; + + auto add_to_wait_for_deleted = [&add_to_wait_for_deleted_binlog_keys, + &wait_for_deleted_binlog_files]( + std::string_view key, std::string_view tablet_path, + int64_t rowset_id, int64_t num_segments) { + add_to_wait_for_deleted_binlog_keys(key); + for (int64_t i = 0; i < num_segments; ++i) { + auto segment_file = fmt::format("{}_{}.dat", rowset_id, i); Review Comment: we should not format file name here and should put format in one place, e.g. BetaRowset::segment_file_path ########## be/src/service/backend_service.cpp: ########## @@ -373,4 +380,203 @@ void BackendService::clean_trash() { void BackendService::check_storage_format(TCheckStorageFormatResult& result) { StorageEngine::instance()->tablet_manager()->get_all_tablets_storage_format(&result); } + +void BackendService::ingest_binlog(TIngestBinlogResult& result, + const TIngestBinlogRequest& request) { + int64_t txn_id = request.txn_id; + // Step 1: get local tablet + auto const& local_tablet_id = request.local_tablet_id; + auto local_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(local_tablet_id); + if (local_tablet == nullptr) { + LOG(WARNING) << "tablet " << local_tablet_id << " not found"; + result.status.__set_status_code(TStatusCode::RUNTIME_ERROR); Review Comment: We shoud return TABLET_NOT_FOUND error code? ########## be/src/olap/olap_meta.cpp: ########## @@ -54,28 +56,9 @@ using namespace ErrorCode; const std::string META_POSTFIX = "/meta"; const size_t PREFIX_LENGTH = 4; -OlapMeta::OlapMeta(const std::string& root_path) : _root_path(root_path), _db(nullptr) {} +OlapMeta::OlapMeta(const std::string& root_path) : _root_path(root_path) {} -OlapMeta::~OlapMeta() { - if (_db != nullptr) { - for (auto& handle : _handles) { - _db->DestroyColumnFamilyHandle(handle); - handle = nullptr; - } - rocksdb::Status s = _db->SyncWAL(); - if (!s.ok()) { - LOG(WARNING) << "rocksdb sync wal failed: " << s.ToString(); - } - rocksdb::CancelAllBackgroundWork(_db, true); Review Comment: I am not sure if we can remove the code. ########## be/src/service/backend_service.cpp: ########## @@ -373,4 +380,203 @@ void BackendService::clean_trash() { void BackendService::check_storage_format(TCheckStorageFormatResult& result) { StorageEngine::instance()->tablet_manager()->get_all_tablets_storage_format(&result); } + +void BackendService::ingest_binlog(TIngestBinlogResult& result, + const TIngestBinlogRequest& request) { + int64_t txn_id = request.txn_id; + // Step 1: get local tablet + auto const& local_tablet_id = request.local_tablet_id; + auto local_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(local_tablet_id); + if (local_tablet == nullptr) { + LOG(WARNING) << "tablet " << local_tablet_id << " not found"; + result.status.__set_status_code(TStatusCode::RUNTIME_ERROR); + result.status.error_msgs.emplace_back(fmt::format("tablet {} not found", local_tablet_id)); + return; + } + + // Step 2: get binlog info + auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download", request.remote_host, + request.remote_port); + constexpr int max_retry = 3; + + auto get_binlog_info_url = + fmt::format("{}?method={}&tablet_id={}&binlog_version={}", binlog_api_url, + "get_binlog_info", request.remote_tablet_id, request.binlog_version); + std::string binlog_info; + auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* client) { + RETURN_IF_ERROR(client->init(get_binlog_info_url)); + client->set_timeout_ms(10); // 10ms + return client->execute(&binlog_info); + }; + auto status = HttpClient::execute_with_retry(max_retry, 1, get_binlog_info_cb); + if (!status.ok()) { + LOG(WARNING) << "failed to get binlog info from " << get_binlog_info_url + << ", status=" << status.to_string(); + status.to_thrift(&result.status); + return; + } + + std::vector<std::string> binlog_info_parts = strings::Split(binlog_info, ":"); + // TODO(Drogon): check binlog info content is right + DCHECK(binlog_info_parts.size() == 2); + const std::string& rowset_id = binlog_info_parts[0]; + int64_t num_segments = std::stoll(binlog_info_parts[1]); + + // Step 3: get rowset meta + auto get_rowset_meta_url = fmt::format( + "{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}", binlog_api_url, + "get_rowset_meta", request.remote_tablet_id, rowset_id, request.binlog_version); + std::string rowset_meta_str; + auto get_rowset_meta_cb = [&get_rowset_meta_url, &rowset_meta_str](HttpClient* client) { + RETURN_IF_ERROR(client->init(get_rowset_meta_url)); + client->set_timeout_ms(10); // 10ms + return client->execute(&rowset_meta_str); + }; + status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb); Review Comment: We can not do http request in bthread context, because pthread running bthread would be blocked. ########## be/src/olap/storage_engine.cpp: ########## @@ -757,6 +769,137 @@ void StorageEngine::_clean_unused_rowset_metas() { } } +void StorageEngine::_gc_binlogs() { + auto data_dirs = get_stores(); + struct tablet_info { + std::string tablet_path; + int64_t binlog_ttl_ms; + }; + std::unordered_map<int64_t, tablet_info> tablets_info; + + auto get_tablet_info = [&tablets_info, this](int64_t tablet_id) -> const tablet_info& { + if (auto iter = tablets_info.find(tablet_id); iter != tablets_info.end()) { + return iter->second; + } + + auto tablet = tablet_manager()->get_tablet(tablet_id); + if (tablet == nullptr) { + LOG(WARNING) << "failed to find tablet " << tablet_id; + static tablet_info empty_tablet_info; + return empty_tablet_info; + } + + auto tablet_path = tablet->tablet_path(); + auto binlog_ttl_ms = tablet->binlog_ttl_ms(); + tablets_info.emplace(tablet_id, tablet_info {tablet_path, binlog_ttl_ms}); + return tablets_info[tablet_id]; + }; + + for (auto data_dir : data_dirs) { + std::string prefix_key {kBinlogMetaPrefix}; + OlapMeta* meta = data_dir->get_meta(); + DCHECK(meta != nullptr); + + auto now = now_ms(); + int64_t last_tablet_id = 0; + std::vector<std::string> wait_for_deleted_binlog_keys; + std::vector<std::string> wait_for_deleted_binlog_files; + auto add_to_wait_for_deleted_binlog_keys = + [&wait_for_deleted_binlog_keys](std::string_view key) { + wait_for_deleted_binlog_keys.emplace_back(key); + wait_for_deleted_binlog_keys.push_back(get_binlog_data_key_from_meta_key(key)); + }; + + auto add_to_wait_for_deleted = [&add_to_wait_for_deleted_binlog_keys, + &wait_for_deleted_binlog_files]( + std::string_view key, std::string_view tablet_path, + int64_t rowset_id, int64_t num_segments) { + add_to_wait_for_deleted_binlog_keys(key); + for (int64_t i = 0; i < num_segments; ++i) { + auto segment_file = fmt::format("{}_{}.dat", rowset_id, i); + wait_for_deleted_binlog_files.emplace_back( + fmt::format("{}/_binlog/{}", tablet_path, segment_file)); Review Comment: directory should be defined globally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org