This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 2cc68381ec2 [feature](binlog) Add ingest_binlog/http_get_snapshot limit download speed && Add async ingest_binlog (#26323) 2cc68381ec2 is described below commit 2cc68381ec2eb3415ea8588b47c31a8c36d8a2cc Author: Jack Drogon <jack.xsuper...@gmail.com> AuthorDate: Mon Nov 6 11:14:44 2023 +0800 [feature](binlog) Add ingest_binlog/http_get_snapshot limit download speed && Add async ingest_binlog (#26323) --- be/src/common/config.cpp | 6 + be/src/common/config.h | 6 + be/src/http/action/download_action.cpp | 29 +- be/src/http/action/download_action.h | 9 +- be/src/http/action/download_binlog_action.cpp | 12 +- be/src/http/action/download_binlog_action.h | 7 +- be/src/http/ev_http_server.cpp | 58 +- be/src/http/ev_http_server.h | 5 + be/src/http/http_channel.cpp | 13 +- be/src/http/http_channel.h | 4 +- be/src/http/utils.cpp | 5 +- be/src/http/utils.h | 5 +- be/src/olap/txn_manager.cpp | 156 ++++-- be/src/olap/txn_manager.h | 32 +- be/src/runtime/snapshot_loader.cpp | 2 +- be/src/service/backend_service.cpp | 599 +++++++++++++-------- be/src/service/backend_service.h | 5 + be/src/service/http_service.cpp | 40 +- be/src/service/http_service.h | 4 + .../org/apache/doris/common/GenericPoolTest.java | 8 + .../apache/doris/utframe/MockedBackendFactory.java | 8 + gensrc/thrift/BackendService.thrift | 23 + .../apache/doris/regression/suite/Syncer.groovy | 3 +- .../suites/ccr_syncer_p0/test_ingest_binlog.groovy | 4 +- 24 files changed, 710 insertions(+), 333 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index deda8d3d6ff..8c57e9c8b71 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1121,6 +1121,12 @@ DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo"); // Max size(bytes) of group commit queues, used for mem back pressure. DEFINE_Int32(group_commit_max_queue_size, "65536"); +// Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency +DEFINE_Int32(ingest_binlog_work_pool_size, "-1"); + +// Download binlog rate limit, unit is KB/s, 0 means no limit +DEFINE_Int32(download_binlog_rate_limit_kbs, "0"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index 4c18f5e001d..e23fa5c357b 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1191,6 +1191,12 @@ DECLARE_String(default_tzfiles_path); // Max size(bytes) of group commit queues, used for mem back pressure. DECLARE_Int32(group_commit_max_queue_size); +// Ingest binlog work pool size +DECLARE_Int32(ingest_binlog_work_pool_size); + +// Download binlog rate limit, unit is KB/s +DECLARE_Int32(download_binlog_rate_limit_kbs); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/http/action/download_action.cpp b/be/src/http/action/download_action.cpp index f2068d83c7b..f271b4f1916 100644 --- a/be/src/http/action/download_action.cpp +++ b/be/src/http/action/download_action.cpp @@ -33,13 +33,20 @@ #include "runtime/exec_env.h" namespace doris { - -const std::string FILE_PARAMETER = "file"; -const std::string TOKEN_PARAMETER = "token"; - -DownloadAction::DownloadAction(ExecEnv* exec_env, const std::vector<std::string>& allow_dirs, - int32_t num_workers) - : _exec_env(exec_env), _download_type(NORMAL), _num_workers(num_workers) { +namespace { +static const std::string FILE_PARAMETER = "file"; +static const std::string TOKEN_PARAMETER = "token"; +static const std::string CHANNEL_PARAMETER = "channel"; +static const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog"; +} // namespace + +DownloadAction::DownloadAction(ExecEnv* exec_env, + std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group, + const std::vector<std::string>& allow_dirs, int32_t num_workers) + : _exec_env(exec_env), + _download_type(NORMAL), + _num_workers(num_workers), + _rate_limit_group(std::move(rate_limit_group)) { for (auto& dir : allow_dirs) { std::string p; Status st = io::global_local_filesystem()->canonicalize(dir, &p); @@ -107,7 +114,13 @@ void DownloadAction::handle_normal(HttpRequest* req, const std::string& file_par if (is_dir) { do_dir_response(file_param, req); } else { - do_file_response(file_param, req); + const auto& channel = req->param(CHANNEL_PARAMETER); + bool ingest_binlog = (channel == CHANNEL_INGEST_BINLOG_TYPE); + if (ingest_binlog) { + do_file_response(file_param, req, _rate_limit_group.get()); + } else { + do_file_response(file_param, req); + } } } diff --git a/be/src/http/action/download_action.h b/be/src/http/action/download_action.h index d8e468d9585..3aab1a0d314 100644 --- a/be/src/http/action/download_action.h +++ b/be/src/http/action/download_action.h @@ -24,6 +24,8 @@ #include "http/http_handler.h" #include "util/threadpool.h" +struct bufferevent_rate_limit_group; + namespace doris { class ExecEnv; @@ -36,8 +38,9 @@ class HttpRequest; // We use parameter named 'file' to specify the static resource path, it is an absolute path. class DownloadAction : public HttpHandler { public: - DownloadAction(ExecEnv* exec_env, const std::vector<std::string>& allow_dirs, - int32_t num_workers = 0); + DownloadAction(ExecEnv* exec_env, + std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group, + const std::vector<std::string>& allow_dirs, int32_t num_workers = 0); // for load error DownloadAction(ExecEnv* exec_env, const std::string& error_log_root_dir); @@ -67,6 +70,8 @@ private: std::string _error_log_root_dir; int32_t _num_workers; std::unique_ptr<ThreadPool> _download_workers; + + std::shared_ptr<bufferevent_rate_limit_group> _rate_limit_group {nullptr}; }; // end class DownloadAction } // end namespace doris diff --git a/be/src/http/action/download_binlog_action.cpp b/be/src/http/action/download_binlog_action.cpp index a23d5ec109f..697512b2a30 100644 --- a/be/src/http/action/download_binlog_action.cpp +++ b/be/src/http/action/download_binlog_action.cpp @@ -21,8 +21,10 @@ #include <fmt/ranges.h> #include <cstdint> +#include <limits> #include <stdexcept> #include <string_view> +#include <utility> #include <vector> #include "common/config.h" @@ -96,7 +98,7 @@ void handle_get_binlog_info(HttpRequest* req) { } /// handle get segment file, need tablet_id, rowset_id && index -void handle_get_segment_file(HttpRequest* req) { +void handle_get_segment_file(HttpRequest* req, bufferevent_rate_limit_group* rate_limit_group) { // Step 1: get download file path std::string segment_file_path; try { @@ -125,7 +127,7 @@ void handle_get_segment_file(HttpRequest* req) { LOG(WARNING) << "file not exist, file path: " << segment_file_path; return; } - do_file_response(segment_file_path, req); + do_file_response(segment_file_path, req, rate_limit_group); } void handle_get_rowset_meta(HttpRequest* req) { @@ -149,7 +151,9 @@ void handle_get_rowset_meta(HttpRequest* req) { } // namespace -DownloadBinlogAction::DownloadBinlogAction(ExecEnv* exec_env) : _exec_env(exec_env) {} +DownloadBinlogAction::DownloadBinlogAction( + ExecEnv* exec_env, std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group) + : _exec_env(exec_env), _rate_limit_group(std::move(rate_limit_group)) {} void DownloadBinlogAction::handle(HttpRequest* req) { VLOG_CRITICAL << "accept one download binlog request " << req->debug_string(); @@ -178,7 +182,7 @@ void DownloadBinlogAction::handle(HttpRequest* req) { if (method == "get_binlog_info") { handle_get_binlog_info(req); } else if (method == "get_segment_file") { - handle_get_segment_file(req); + handle_get_segment_file(req, _rate_limit_group.get()); } else if (method == "get_rowset_meta") { handle_get_rowset_meta(req); } else { diff --git a/be/src/http/action/download_binlog_action.h b/be/src/http/action/download_binlog_action.h index 3cbd9b9e5b0..77a2ed08780 100644 --- a/be/src/http/action/download_binlog_action.h +++ b/be/src/http/action/download_binlog_action.h @@ -17,12 +17,15 @@ #pragma once +#include <memory> #include <string> #include <vector> #include "common/status.h" #include "http/http_handler.h" +struct bufferevent_rate_limit_group; + namespace doris { class ExecEnv; @@ -30,7 +33,8 @@ class HttpRequest; class DownloadBinlogAction : public HttpHandler { public: - DownloadBinlogAction(ExecEnv* exec_env); + DownloadBinlogAction(ExecEnv* exec_env, + std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group); virtual ~DownloadBinlogAction() = default; void handle(HttpRequest* req) override; @@ -40,6 +44,7 @@ private: private: ExecEnv* _exec_env; + std::shared_ptr<bufferevent_rate_limit_group> _rate_limit_group {nullptr}; }; } // namespace doris diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp index 5d231cb9087..1bbd2c0e178 100644 --- a/be/src/http/ev_http_server.cpp +++ b/be/src/http/ev_http_server.cpp @@ -84,7 +84,17 @@ static int on_connection(struct evhttp_request* req, void* param) { EvHttpServer::EvHttpServer(int port, int num_workers) : _port(port), _num_workers(num_workers), _real_port(0) { _host = BackendOptions::get_service_bind_address(); + + evthread_use_pthreads(); DCHECK_GT(_num_workers, 0); + _event_bases.resize(_num_workers); + for (int i = 0; i < _num_workers; ++i) { + std::shared_ptr<event_base> base(event_base_new(), + [](event_base* base) { event_base_free(base); }); + CHECK(base != nullptr) << "Couldn't create an event_base."; + std::lock_guard lock(_event_bases_lock); + _event_bases[i] = base; + } } EvHttpServer::EvHttpServer(const std::string& host, int port, int num_workers) @@ -107,34 +117,28 @@ void EvHttpServer::start() { .set_min_threads(_num_workers) .set_max_threads(_num_workers) .build(&_workers)); - - evthread_use_pthreads(); - _event_bases.resize(_num_workers); for (int i = 0; i < _num_workers; ++i) { - CHECK(_workers->submit_func([this, i]() { - std::shared_ptr<event_base> base(event_base_new(), [](event_base* base) { - event_base_free(base); - }); - CHECK(base != nullptr) << "Couldn't create an event_base."; - { - std::lock_guard<std::mutex> lock(_event_bases_lock); - _event_bases[i] = base; - } - - /* Create a new evhttp object to handle requests. */ - std::shared_ptr<evhttp> http(evhttp_new(base.get()), - [](evhttp* http) { evhttp_free(http); }); - CHECK(http != nullptr) << "Couldn't create an evhttp."; - - auto res = evhttp_accept_socket(http.get(), _server_fd); - CHECK(res >= 0) << "evhttp accept socket failed, res=" << res; - - evhttp_set_newreqcb(http.get(), on_connection, this); - evhttp_set_gencb(http.get(), on_request, this); - - event_base_dispatch(base.get()); - }) - .ok()); + auto status = _workers->submit_func([this, i]() { + std::shared_ptr<event_base> base; + { + std::lock_guard lock(_event_bases_lock); + base = _event_bases[i]; + } + + /* Create a new evhttp object to handle requests. */ + std::shared_ptr<evhttp> http(evhttp_new(base.get()), + [](evhttp* http) { evhttp_free(http); }); + CHECK(http != nullptr) << "Couldn't create an evhttp."; + + auto res = evhttp_accept_socket(http.get(), _server_fd); + CHECK(res >= 0) << "evhttp accept socket failed, res=" << res; + + evhttp_set_newreqcb(http.get(), on_connection, this); + evhttp_set_gencb(http.get(), on_request, this); + + event_base_dispatch(base.get()); + }); + CHECK(status.ok()); } } diff --git a/be/src/http/ev_http_server.h b/be/src/http/ev_http_server.h index e7ad1c052ab..d74a8cb4efd 100644 --- a/be/src/http/ev_http_server.h +++ b/be/src/http/ev_http_server.h @@ -55,6 +55,11 @@ public: // get real port int get_real_port() const { return _real_port; } + std::vector<std::shared_ptr<event_base>> get_event_bases() { + std::lock_guard lock(_event_bases_lock); + return _event_bases; + } + private: Status _bind(); HttpHandler* _find_handler(HttpRequest* req); diff --git a/be/src/http/http_channel.cpp b/be/src/http/http_channel.cpp index 5727ba3902e..96679195316 100644 --- a/be/src/http/http_channel.cpp +++ b/be/src/http/http_channel.cpp @@ -18,6 +18,7 @@ #include "http/http_channel.h" #include <event2/buffer.h> +#include <event2/bufferevent.h> #include <event2/http.h> #include <algorithm> @@ -69,11 +70,17 @@ void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std: evbuffer_free(evb); } -void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t size) { +void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t size, + bufferevent_rate_limit_group* rate_limit_group) { auto evb = evbuffer_new(); evbuffer_add_file(evb, fd, off, size); - evhttp_send_reply(request->get_evhttp_request(), HttpStatus::OK, - default_reason(HttpStatus::OK).c_str(), evb); + auto* evhttp_request = request->get_evhttp_request(); + if (rate_limit_group) { + auto* evhttp_connection = evhttp_request_get_connection(evhttp_request); + auto* buffer_event = evhttp_connection_get_bufferevent(evhttp_connection); + bufferevent_add_to_rate_limit_group(buffer_event, rate_limit_group); + } + evhttp_send_reply(evhttp_request, HttpStatus::OK, default_reason(HttpStatus::OK).c_str(), evb); evbuffer_free(evb); } diff --git a/be/src/http/http_channel.h b/be/src/http/http_channel.h index 478f013af82..ee1e6c0888f 100644 --- a/be/src/http/http_channel.h +++ b/be/src/http/http_channel.h @@ -23,6 +23,7 @@ #include "http/http_status.h" +struct bufferevent_rate_limit_group; namespace doris { class HttpRequest; @@ -43,7 +44,8 @@ public: static void send_reply(HttpRequest* request, HttpStatus status, const std::string& content); - static void send_file(HttpRequest* request, int fd, size_t off, size_t size); + static void send_file(HttpRequest* request, int fd, size_t off, size_t size, + bufferevent_rate_limit_group* rate_limit_group = nullptr); static bool compress_content(const std::string& accept_encoding, const std::string& input, std::string* output); diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp index f55b5d47696..31550456c55 100644 --- a/be/src/http/utils.cpp +++ b/be/src/http/utils.cpp @@ -124,7 +124,8 @@ std::string get_content_type(const std::string& file_name) { return ""; } -void do_file_response(const std::string& file_path, HttpRequest* req) { +void do_file_response(const std::string& file_path, HttpRequest* req, + bufferevent_rate_limit_group* rate_limit_group) { if (file_path.find("..") != std::string::npos) { LOG(WARNING) << "Not allowed to read relative path: " << file_path; HttpChannel::send_error(req, HttpStatus::FORBIDDEN); @@ -165,7 +166,7 @@ void do_file_response(const std::string& file_path, HttpRequest* req) { return; } - HttpChannel::send_file(req, fd, 0, file_size); + HttpChannel::send_file(req, fd, 0, file_size, rate_limit_group); } void do_dir_response(const std::string& dir_path, HttpRequest* req) { diff --git a/be/src/http/utils.h b/be/src/http/utils.h index 5928039c492..2d1e13fbe4e 100644 --- a/be/src/http/utils.h +++ b/be/src/http/utils.h @@ -22,6 +22,8 @@ #include "common/utils.h" #include "http/http_request.h" +struct bufferevent_rate_limit_group; + namespace doris { struct AuthInfo; @@ -34,7 +36,8 @@ bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string* pa bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth); -void do_file_response(const std::string& dir_path, HttpRequest* req); +void do_file_response(const std::string& dir_path, HttpRequest* req, + bufferevent_rate_limit_group* rate_limit_group = nullptr); void do_dir_response(const std::string& dir_path, HttpRequest* req); diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 1ec7899b460..af6b0d67235 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -140,6 +140,7 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transac // case 1: user start a new txn, rowset = null // case 2: loading txn from meta env TabletTxnInfo load_info(load_id, nullptr, ingest); + load_info.prepare(); txn_tablet_map[key][tablet_info] = load_info; _insert_txn_partition_map_unlocked(transaction_id, partition_id); VLOG_NOTICE << "add transaction to engine successfully." @@ -162,6 +163,29 @@ Status TxnManager::publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet->tablet_id(), tablet->tablet_uid(), version, stats); } +void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId transaction_id, + TTabletId tablet_id, TabletUid tablet_uid) { + pair<int64_t, int64_t> key(partition_id, transaction_id); + TabletInfo tablet_info(tablet_id, tablet_uid); + + std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); + + auto& txn_tablet_map = _get_txn_tablet_map(transaction_id); + auto it = txn_tablet_map.find(key); + if (it == txn_tablet_map.end()) { + return; + } + + auto& tablet_txn_info_map = it->second; + if (auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info); + tablet_txn_info_iter == tablet_txn_info_map.end()) { + return; + } else { + auto& txn_info = tablet_txn_info_iter->second; + txn_info.abort(); + } +} + // delete the txn from manager if it is not committed(not have a valid rowset) Status TxnManager::rollback_txn(TPartitionId partition_id, const Tablet& tablet, TTransactionId transaction_id) { @@ -217,6 +241,7 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, << " partition_id=" << partition_id << " transaction_id=" << transaction_id << " tablet_id=" << tablet_id; } + pair<int64_t, int64_t> key(partition_id, transaction_id); TabletInfo tablet_info(tablet_id, tablet_uid); if (rowset_ptr == nullptr) { @@ -252,28 +277,30 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, // case 1: user commit rowset, then the load id must be equal TabletTxnInfo& load_info = load_itr->second; // check if load id is equal - if (load_info.load_id.hi() == load_id.hi() && load_info.load_id.lo() == load_id.lo() && - load_info.rowset != nullptr && - load_info.rowset->rowset_id() == rowset_ptr->rowset_id()) { - // find a rowset with same rowset id, then it means a duplicate call + if (load_info.rowset == nullptr) { + break; + } + + if (load_info.load_id.hi() != load_id.hi() || load_info.load_id.lo() != load_id.lo()) { + break; + } + + // find a rowset with same rowset id, then it means a duplicate call + if (load_info.rowset->rowset_id() == rowset_ptr->rowset_id()) { LOG(INFO) << "find rowset exists when commit transaction to engine." << "partition_id: " << key.first << ", transaction_id: " << key.second << ", tablet: " << tablet_info.to_string() << ", rowset_id: " << load_info.rowset->rowset_id(); return Status::OK(); - } else if (load_info.load_id.hi() == load_id.hi() && - load_info.load_id.lo() == load_id.lo() && load_info.rowset != nullptr && - load_info.rowset->rowset_id() != rowset_ptr->rowset_id()) { - // find a rowset with different rowset id, then it should not happen, just return errors - return Status::Error<PUSH_TRANSACTION_ALREADY_EXIST>( - "find rowset exists when commit transaction to engine. but rowset ids are not " - "same. partition_id: {}, transaction_id: {}, tablet: {}, exist rowset_id: {}, " - "new rowset_id: {}", - key.first, key.second, tablet_info.to_string(), - load_info.rowset->rowset_id().to_string(), rowset_ptr->rowset_id().to_string()); - } else { - break; } + + // find a rowset with different rowset id, then it should not happen, just return errors + return Status::Error<PUSH_TRANSACTION_ALREADY_EXIST>( + "find rowset exists when commit transaction to engine. but rowset ids are not " + "same. partition_id: {}, transaction_id: {}, tablet: {}, exist rowset_id: {}, new " + "rowset_id: {}", + key.first, key.second, tablet_info.to_string(), + load_info.rowset->rowset_id().to_string(), rowset_ptr->rowset_id().to_string()); } while (false); // if not in recovery mode, then should persist the meta to meta env @@ -301,6 +328,8 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, load_info.delete_bitmap.reset(new DeleteBitmap(tablet->tablet_id())); } } + load_info.commit(); + txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); txn_tablet_map[key][tablet_info] = load_info; _insert_txn_partition_map_unlocked(transaction_id, partition_id); @@ -453,30 +482,36 @@ Status TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId transa TTabletId tablet_id, TabletUid tablet_uid) { pair<int64_t, int64_t> key(partition_id, transaction_id); TabletInfo tablet_info(tablet_id, tablet_uid); + std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); + auto it = txn_tablet_map.find(key); - if (it != txn_tablet_map.end()) { - auto load_itr = it->second.find(tablet_info); - if (load_itr != it->second.end()) { - // found load for txn,tablet - // case 1: user commit rowset, then the load id must be equal - TabletTxnInfo& load_info = load_itr->second; - if (load_info.rowset != nullptr) { - return Status::Error<TRANSACTION_ALREADY_COMMITTED>( - "if rowset is not null, it means other thread may commit the rowset should " - "not delete txn any more"); - } - } - it->second.erase(tablet_info); - LOG(INFO) << "rollback transaction from engine successfully." - << " partition_id: " << key.first << ", transaction_id: " << key.second - << ", tablet: " << tablet_info.to_string(); - if (it->second.empty()) { - txn_tablet_map.erase(it); - _clear_txn_partition_map_unlocked(transaction_id, partition_id); + if (it == txn_tablet_map.end()) { + return Status::OK(); + } + + auto& tablet_txn_info_map = it->second; + if (auto load_itr = tablet_txn_info_map.find(tablet_info); + load_itr != tablet_txn_info_map.end()) { + // found load for txn,tablet + // case 1: user commit rowset, then the load id must be equal + TabletTxnInfo& load_info = load_itr->second; + if (load_info.rowset != nullptr) { + return Status::Error<TRANSACTION_ALREADY_COMMITTED>( + "if rowset is not null, it means other thread may commit the rowset should " + "not delete txn any more"); } } + + tablet_txn_info_map.erase(tablet_info); + LOG(INFO) << "rollback transaction from engine successfully." + << " partition_id: " << key.first << ", transaction_id: " << key.second + << ", tablet: " << tablet_info.to_string(); + if (tablet_txn_info_map.empty()) { + txn_tablet_map.erase(it); + _clear_txn_partition_map_unlocked(transaction_id, partition_id); + } return Status::OK(); } @@ -650,18 +685,6 @@ void TxnManager::get_all_commit_tablet_txn_info_by_tablet( } } -bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId transaction_id, - TTabletId tablet_id, TabletUid tablet_uid) { - pair<int64_t, int64_t> key(partition_id, transaction_id); - TabletInfo tablet_info(tablet_id, tablet_uid); - std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); - txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); - auto it = txn_tablet_map.find(key); - bool found = it != txn_tablet_map.end() && it->second.find(tablet_info) != it->second.end(); - - return found; -} - void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map) { int64_t now = UnixSeconds(); // traverse the txn map, and get all expired txns @@ -671,13 +694,15 @@ void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>> auto txn_id = it.first.second; for (auto& t_map : it.second) { double diff = difftime(now, t_map.second.creation_time); - if (diff >= config::pending_data_expire_time_sec) { - (*expire_txn_map)[t_map.first].push_back(txn_id); - if (VLOG_IS_ON(3)) { - VLOG_NOTICE << "find expired txn." - << " tablet=" << t_map.first.to_string() - << " transaction_id=" << txn_id << " exist_sec=" << diff; - } + if (diff < config::pending_data_expire_time_sec) { + continue; + } + + (*expire_txn_map)[t_map.first].push_back(txn_id); + if (VLOG_IS_ON(3)) { + VLOG_NOTICE << "find expired txn." + << " tablet=" << t_map.first.to_string() + << " transaction_id=" << txn_id << " exist_sec=" << diff; } } } @@ -796,4 +821,27 @@ void TxnManager::update_tablet_version_txn(int64_t tablet_id, int64_t version, i _tablet_version_cache->release(handle); } +TxnState TxnManager::get_txn_state(TPartitionId partition_id, TTransactionId transaction_id, + TTabletId tablet_id, TabletUid tablet_uid) { + pair<int64_t, int64_t> key(partition_id, transaction_id); + TabletInfo tablet_info(tablet_id, tablet_uid); + + std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); + + auto& txn_tablet_map = _get_txn_tablet_map(transaction_id); + auto it = txn_tablet_map.find(key); + if (it == txn_tablet_map.end()) { + return TxnState::NOT_FOUND; + } + + auto& tablet_txn_info_map = it->second; + if (auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info); + tablet_txn_info_iter == tablet_txn_info_map.end()) { + return TxnState::NOT_FOUND; + } else { + const auto& txn_info = tablet_txn_info_iter->second; + return txn_info.state; + } +} + } // namespace doris diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 6d2222a3ec5..b5a1db0b46e 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -51,6 +51,15 @@ class DeltaWriter; class OlapMeta; struct TabletPublishStatistics; +enum class TxnState { + NOT_FOUND = 0, + PREPARED = 1, + COMMITTED = 2, + ROLLEDBACK = 3, + ABORTED = 4, + DELETED = 5, +}; + struct TabletTxnInfo { PUniqueId load_id; RowsetSharedPtr rowset; @@ -61,6 +70,9 @@ struct TabletTxnInfo { int64_t creation_time; bool ingest {false}; std::shared_ptr<PartialUpdateInfo> partial_update_info; + TxnState state {TxnState::PREPARED}; + + TabletTxnInfo() = default; TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset) : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {} @@ -77,7 +89,14 @@ struct TabletTxnInfo { rowset_ids(ids), creation_time(UnixSeconds()) {} - TabletTxnInfo() {} + void prepare() { state = TxnState::PREPARED; } + void commit() { state = TxnState::COMMITTED; } + void rollback() { state = TxnState::ROLLEDBACK; } + void abort() { + if (state == TxnState::PREPARED) { + state = TxnState::ABORTED; + } + } }; struct CommitTabletTxnInfo { @@ -145,6 +164,10 @@ public: TTabletId tablet_id, TabletUid tablet_uid, const Version& version, TabletPublishStatistics* stats); + // only abort not committed txn + void abort_txn(TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, + TabletUid tablet_uid); + // delete the txn from manager if it is not committed(not have a valid rowset) Status rollback_txn(TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, TabletUid tablet_uid); @@ -163,10 +186,6 @@ public: void get_all_related_tablets(std::set<TabletInfo>* tablet_infos); - // Just check if the txn exists. - bool has_txn(TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, - TabletUid tablet_uid); - // Get all expired txns and save them in expire_txn_map. // This is currently called before reporting all tablet info, to avoid iterating txn map for every tablets. void build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map); @@ -195,6 +214,9 @@ public: int64_t get_txn_by_tablet_version(int64_t tablet_id, int64_t version); void update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id); + TxnState get_txn_state(TPartitionId partition_id, TTransactionId transaction_id, + TTabletId tablet_id, TabletUid tablet_uid); + private: using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id; diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index b5752710ae8..26103e50427 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -467,7 +467,7 @@ Status SnapshotLoader::remote_http_download( for (const auto& filename : filename_list) { std::string remote_file_url = fmt::format( - "http://{}:{}/api/_tablet/_download?token={}&file={}/{}", + "http://{}:{}/api/_tablet/_download?token={}&file={}/{}&channel=ingest_binlog", remote_tablet_snapshot.remote_be_addr.hostname, remote_tablet_snapshot.remote_be_addr.port, remote_tablet_snapshot.remote_token, remote_tablet_snapshot.remote_snapshot_path, filename); diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index a312996162c..ebf553b14f0 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -36,6 +36,7 @@ #include <memory> #include <ostream> #include <string> +#include <thread> #include <utility> #include <vector> @@ -63,6 +64,7 @@ #include "runtime/stream_load/stream_load_recorder.h" #include "util/arrow/row_batch.h" #include "util/defer_op.h" +#include "util/threadpool.h" #include "util/thrift_server.h" #include "util/uid_util.h" @@ -79,6 +81,283 @@ class TTransportException; namespace doris { +namespace { +constexpr uint64_t kMaxTimeoutMs = 3000; // 3s +struct IngestBinlogArg { + int64_t txn_id; + int64_t partition_id; + int64_t local_tablet_id; + TabletSharedPtr local_tablet; + TIngestBinlogRequest request; + TStatus* tstatus; +}; + +void _ingest_binlog(IngestBinlogArg* arg) { + auto txn_id = arg->txn_id; + auto partition_id = arg->partition_id; + auto local_tablet_id = arg->local_tablet_id; + const auto& local_tablet = arg->local_tablet; + const auto& local_tablet_uid = local_tablet->tablet_uid(); + + auto& request = arg->request; + + TStatus tstatus; + Defer defer {[=, &tstatus, ingest_binlog_tstatus = arg->tstatus]() { + LOG(INFO) << "ingest binlog. result: " << apache::thrift::ThriftDebugString(tstatus); + if (tstatus.status_code != TStatusCode::OK) { + // abort txn + StorageEngine::instance()->txn_manager()->abort_txn(partition_id, txn_id, + local_tablet_id, local_tablet_uid); + } + + if (ingest_binlog_tstatus) { + *ingest_binlog_tstatus = std::move(tstatus); + } + }}; + + auto set_tstatus = [&tstatus](TStatusCode::type code, std::string error_msg) { + tstatus.__set_status_code(code); + tstatus.__isset.error_msgs = true; + tstatus.error_msgs.push_back(std::move(error_msg)); + }; + + // Step 3: get binlog info + auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download", request.remote_host, + request.remote_port); + constexpr int max_retry = 3; + + auto get_binlog_info_url = + fmt::format("{}?method={}&tablet_id={}&binlog_version={}", binlog_api_url, + "get_binlog_info", request.remote_tablet_id, request.binlog_version); + std::string binlog_info; + auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* client) { + RETURN_IF_ERROR(client->init(get_binlog_info_url)); + client->set_timeout_ms(kMaxTimeoutMs); + return client->execute(&binlog_info); + }; + auto status = HttpClient::execute_with_retry(max_retry, 1, get_binlog_info_cb); + if (!status.ok()) { + LOG(WARNING) << "failed to get binlog info from " << get_binlog_info_url + << ", status=" << status.to_string(); + status.to_thrift(&tstatus); + return; + } + + std::vector<std::string> binlog_info_parts = strings::Split(binlog_info, ":"); + // TODO(Drogon): check binlog info content is right + DCHECK(binlog_info_parts.size() == 2); + const std::string& remote_rowset_id = binlog_info_parts[0]; + int64_t num_segments = std::stoll(binlog_info_parts[1]); + + // Step 4: get rowset meta + auto get_rowset_meta_url = fmt::format( + "{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}", binlog_api_url, + "get_rowset_meta", request.remote_tablet_id, remote_rowset_id, request.binlog_version); + std::string rowset_meta_str; + auto get_rowset_meta_cb = [&get_rowset_meta_url, &rowset_meta_str](HttpClient* client) { + RETURN_IF_ERROR(client->init(get_rowset_meta_url)); + client->set_timeout_ms(kMaxTimeoutMs); + return client->execute(&rowset_meta_str); + }; + status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb); + if (!status.ok()) { + LOG(WARNING) << "failed to get rowset meta from " << get_rowset_meta_url + << ", status=" << status.to_string(); + status.to_thrift(&tstatus); + return; + } + RowsetMetaPB rowset_meta_pb; + if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) { + LOG(WARNING) << "failed to parse rowset meta from " << get_rowset_meta_url; + status = Status::InternalError("failed to parse rowset meta"); + status.to_thrift(&tstatus); + return; + } + // rewrite rowset meta + rowset_meta_pb.set_tablet_id(local_tablet_id); + rowset_meta_pb.set_partition_id(partition_id); + rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash()); + rowset_meta_pb.set_txn_id(txn_id); + rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED); + auto rowset_meta = std::make_shared<RowsetMeta>(); + if (!rowset_meta->init_from_pb(rowset_meta_pb)) { + LOG(WARNING) << "failed to init rowset meta from " << get_rowset_meta_url; + status = Status::InternalError("failed to init rowset meta"); + status.to_thrift(&tstatus); + return; + } + RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id(); + rowset_meta->set_rowset_id(new_rowset_id); + rowset_meta->set_tablet_uid(local_tablet->tablet_uid()); + + // Step 5: get all segment files + // Step 5.1: get all segment files size + std::vector<std::string> segment_file_urls; + segment_file_urls.reserve(num_segments); + std::vector<uint64_t> segment_file_sizes; + segment_file_sizes.reserve(num_segments); + for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { + auto get_segment_file_size_url = fmt::format( + "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}", binlog_api_url, + "get_segment_file", request.remote_tablet_id, remote_rowset_id, segment_index); + uint64_t segment_file_size; + auto get_segment_file_size_cb = [&get_segment_file_size_url, + &segment_file_size](HttpClient* client) { + RETURN_IF_ERROR(client->init(get_segment_file_size_url)); + client->set_timeout_ms(kMaxTimeoutMs); + RETURN_IF_ERROR(client->head()); + return client->get_content_length(&segment_file_size); + }; + + status = HttpClient::execute_with_retry(max_retry, 1, get_segment_file_size_cb); + if (!status.ok()) { + LOG(WARNING) << "failed to get segment file size from " << get_segment_file_size_url + << ", status=" << status.to_string(); + status.to_thrift(&tstatus); + return; + } + + segment_file_sizes.push_back(segment_file_size); + segment_file_urls.push_back(std::move(get_segment_file_size_url)); + } + + // Step 5.2: check data capacity + uint64_t total_size = std::accumulate(segment_file_sizes.begin(), segment_file_sizes.end(), 0); + if (!local_tablet->can_add_binlog(total_size)) { + LOG(WARNING) << "failed to add binlog, no enough space, total_size=" << total_size + << ", tablet=" << local_tablet->tablet_id(); + status = Status::InternalError("no enough space"); + status.to_thrift(&tstatus); + return; + } + + // Step 5.3: get all segment files + for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { + auto segment_file_size = segment_file_sizes[segment_index]; + auto get_segment_file_url = segment_file_urls[segment_index]; + + uint64_t estimate_timeout = + segment_file_size / config::download_low_speed_limit_kbps / 1024; + if (estimate_timeout < config::download_low_speed_time) { + estimate_timeout = config::download_low_speed_time; + } + + auto local_segment_path = BetaRowset::segment_file_path( + local_tablet->tablet_path(), rowset_meta->rowset_id(), segment_index); + LOG(INFO) << fmt::format("download segment file from {} to {}", get_segment_file_url, + local_segment_path); + auto get_segment_file_cb = [&get_segment_file_url, &local_segment_path, segment_file_size, + estimate_timeout](HttpClient* client) { + RETURN_IF_ERROR(client->init(get_segment_file_url)); + client->set_timeout_ms(estimate_timeout * 1000); + RETURN_IF_ERROR(client->download(local_segment_path)); + + std::error_code ec; + // Check file length + uint64_t local_file_size = std::filesystem::file_size(local_segment_path, ec); + if (ec) { + LOG(WARNING) << "download file error" << ec.message(); + return Status::IOError("can't retrive file_size of {}, due to {}", + local_segment_path, ec.message()); + } + if (local_file_size != segment_file_size) { + LOG(WARNING) << "download file length error" + << ", get_segment_file_url=" << get_segment_file_url + << ", file_size=" << segment_file_size + << ", local_file_size=" << local_file_size; + return Status::InternalError("downloaded file size is not equal"); + } + chmod(local_segment_path.c_str(), S_IRUSR | S_IWUSR); + return Status::OK(); + }; + + auto status = HttpClient::execute_with_retry(max_retry, 1, get_segment_file_cb); + if (!status.ok()) { + LOG(WARNING) << "failed to get segment file from " << get_segment_file_url + << ", status=" << status.to_string(); + status.to_thrift(&tstatus); + return; + } + } + + // Step 6: create rowset && calculate delete bitmap && commit + // Step 6.1: create rowset + RowsetSharedPtr rowset; + status = RowsetFactory::create_rowset(local_tablet->tablet_schema(), + local_tablet->tablet_path(), rowset_meta, &rowset); + + if (!status) { + LOG(WARNING) << "failed to create rowset from rowset meta for remote tablet" + << ". rowset_id: " << rowset_meta_pb.rowset_id() + << ", rowset_type: " << rowset_meta_pb.rowset_type() + << ", remote_tablet_id=" << rowset_meta_pb.tablet_id() << ", txn_id=" << txn_id + << ", status=" << status.to_string(); + status.to_thrift(&tstatus); + return; + } + + // Step 6.2 calculate delete bitmap before commit + auto calc_delete_bitmap_token = + StorageEngine::instance()->calc_delete_bitmap_executor()->create_token(); + DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(local_tablet_id); + RowsetIdUnorderedSet pre_rowset_ids; + if (local_tablet->enable_unique_key_merge_on_write()) { + auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get()); + std::vector<segment_v2::SegmentSharedPtr> segments; + status = beta_rowset->load_segments(&segments); + if (!status) { + LOG(WARNING) << "failed to load segments from rowset" + << ". rowset_id: " << beta_rowset->rowset_id() << ", txn_id=" << txn_id + << ", status=" << status.to_string(); + status.to_thrift(&tstatus); + return; + } + if (segments.size() > 1) { + // calculate delete bitmap between segments + status = local_tablet->calc_delete_bitmap_between_segments(rowset, segments, + delete_bitmap); + if (!status) { + LOG(WARNING) << "failed to calculate delete bitmap" + << ". tablet_id: " << local_tablet->tablet_id() + << ". rowset_id: " << rowset->rowset_id() << ", txn_id=" << txn_id + << ", status=" << status.to_string(); + status.to_thrift(&tstatus); + return; + } + } + + static_cast<void>(local_tablet->commit_phase_update_delete_bitmap( + rowset, pre_rowset_ids, delete_bitmap, segments, txn_id, + calc_delete_bitmap_token.get(), nullptr)); + static_cast<void>(calc_delete_bitmap_token->wait()); + } + + // Step 6.3: commit txn + Status commit_txn_status = StorageEngine::instance()->txn_manager()->commit_txn( + local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(), + rowset_meta->txn_id(), rowset_meta->tablet_id(), local_tablet->tablet_uid(), + rowset_meta->load_id(), rowset, false); + if (!commit_txn_status && !commit_txn_status.is<ErrorCode::PUSH_TRANSACTION_ALREADY_EXIST>()) { + auto err_msg = fmt::format( + "failed to commit txn for remote tablet. rowset_id: {}, remote_tablet_id={}, " + "txn_id={}, status={}", + rowset_meta->rowset_id().to_string(), rowset_meta->tablet_id(), + rowset_meta->txn_id(), commit_txn_status.to_string()); + LOG(WARNING) << err_msg; + set_tstatus(TStatusCode::RUNTIME_ERROR, std::move(err_msg)); + return; + } + + if (local_tablet->enable_unique_key_merge_on_write()) { + StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap( + partition_id, txn_id, local_tablet_id, local_tablet->tablet_uid(), true, + delete_bitmap, pre_rowset_ids, nullptr); + } + + tstatus.__set_status_code(TStatusCode::OK); +} +} // namespace + using apache::thrift::TException; using apache::thrift::TProcessor; using apache::thrift::TMultiplexedProcessor; @@ -90,19 +369,33 @@ BackendService::BackendService(ExecEnv* exec_env) Status BackendService::create_service(ExecEnv* exec_env, int port, std::unique_ptr<ThriftServer>* server) { - std::shared_ptr<BackendService> handler(new BackendService(exec_env)); + auto service = std::make_shared<BackendService>(exec_env); // TODO: do we want a BoostThreadFactory? // TODO: we want separate thread factories here, so that fe requests can't starve // be requests - std::shared_ptr<ThreadFactory> thread_factory(new ThreadFactory()); - - std::shared_ptr<TProcessor> be_processor(new BackendServiceProcessor(handler)); + // std::shared_ptr<TProcessor> be_processor = std::make_shared<BackendServiceProcessor>(service); + auto be_processor = std::make_shared<BackendServiceProcessor>(service); *server = std::make_unique<ThriftServer>("backend", be_processor, port, config::be_service_threads); LOG(INFO) << "Doris BackendService listening on " << port; + auto thread_num = config::ingest_binlog_work_pool_size; + if (thread_num < 0) { + LOG(INFO) << fmt::format("ingest binlog thread pool size is {}, so we will in sync mode", + thread_num); + return Status::OK(); + } + + if (thread_num == 0) { + thread_num = std::thread::hardware_concurrency(); + } + static_cast<void>(doris::ThreadPoolBuilder("IngestBinlog") + .set_min_threads(thread_num) + .set_max_threads(thread_num * 2) + .build(&(service->_ingest_binlog_workers))); + LOG(INFO) << fmt::format("ingest binlog thread pool size is {}, in async mode", thread_num); return Status::OK(); } @@ -396,8 +689,6 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result, const TIngestBinlogRequest& request) { LOG(INFO) << "ingest binlog. request: " << apache::thrift::ThriftDebugString(request); - constexpr uint64_t kMaxTimeoutMs = 1000; - TStatus tstatus; Defer defer {[&result, &tstatus]() { result.__set_status(tstatus); @@ -452,6 +743,12 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result, set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg); return; } + if (!request.__isset.local_tablet_id) { + auto error_msg = "local_tablet_id is empty"; + LOG(WARNING) << error_msg; + set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg); + return; + } if (!request.__isset.load_id) { auto error_msg = "load_id is empty"; LOG(WARNING) << error_msg; @@ -486,239 +783,105 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result, return; } - // Step 3: get binlog info - auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download", request.remote_host, - request.remote_port); - constexpr int max_retry = 3; + bool is_async = (_ingest_binlog_workers != nullptr); + result.__set_is_async(is_async); - auto get_binlog_info_url = - fmt::format("{}?method={}&tablet_id={}&binlog_version={}", binlog_api_url, - "get_binlog_info", request.remote_tablet_id, request.binlog_version); - std::string binlog_info; - auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* client) { - RETURN_IF_ERROR(client->init(get_binlog_info_url)); - client->set_timeout_ms(kMaxTimeoutMs); - return client->execute(&binlog_info); - }; - status = HttpClient::execute_with_retry(max_retry, 1, get_binlog_info_cb); - if (!status.ok()) { - LOG(WARNING) << "failed to get binlog info from " << get_binlog_info_url - << ", status=" << status.to_string(); - status.to_thrift(&tstatus); - return; - } + auto ingest_binlog_func = [=, tstatus = &tstatus]() { + IngestBinlogArg ingest_binlog_arg = { + .txn_id = txn_id, + .partition_id = partition_id, + .local_tablet_id = local_tablet_id, + .local_tablet = local_tablet, - std::vector<std::string> binlog_info_parts = strings::Split(binlog_info, ":"); - // TODO(Drogon): check binlog info content is right - DCHECK(binlog_info_parts.size() == 2); - const std::string& remote_rowset_id = binlog_info_parts[0]; - int64_t num_segments = std::stoll(binlog_info_parts[1]); + .request = std::move(request), + .tstatus = is_async ? nullptr : tstatus, + }; - // Step 4: get rowset meta - auto get_rowset_meta_url = fmt::format( - "{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}", binlog_api_url, - "get_rowset_meta", request.remote_tablet_id, remote_rowset_id, request.binlog_version); - std::string rowset_meta_str; - auto get_rowset_meta_cb = [&get_rowset_meta_url, &rowset_meta_str](HttpClient* client) { - RETURN_IF_ERROR(client->init(get_rowset_meta_url)); - client->set_timeout_ms(kMaxTimeoutMs); - return client->execute(&rowset_meta_str); + _ingest_binlog(&ingest_binlog_arg); }; - status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb); - if (!status.ok()) { - LOG(WARNING) << "failed to get rowset meta from " << get_rowset_meta_url - << ", status=" << status.to_string(); - status.to_thrift(&tstatus); - return; - } - RowsetMetaPB rowset_meta_pb; - if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) { - LOG(WARNING) << "failed to parse rowset meta from " << get_rowset_meta_url; - status = Status::InternalError("failed to parse rowset meta"); - status.to_thrift(&tstatus); - return; - } - // rewrite rowset meta - rowset_meta_pb.set_tablet_id(local_tablet_id); - rowset_meta_pb.set_partition_id(partition_id); - rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash()); - rowset_meta_pb.set_txn_id(txn_id); - rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED); - auto rowset_meta = std::make_shared<RowsetMeta>(); - if (!rowset_meta->init_from_pb(rowset_meta_pb)) { - LOG(WARNING) << "failed to init rowset meta from " << get_rowset_meta_url; - status = Status::InternalError("failed to init rowset meta"); - status.to_thrift(&tstatus); - return; - } - RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id(); - rowset_meta->set_rowset_id(new_rowset_id); - rowset_meta->set_tablet_uid(local_tablet->tablet_uid()); - // Step 5: get all segment files - // Step 5.1: get all segment files size - std::vector<std::string> segment_file_urls; - segment_file_urls.reserve(num_segments); - std::vector<uint64_t> segment_file_sizes; - segment_file_sizes.reserve(num_segments); - for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { - auto get_segment_file_size_url = fmt::format( - "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}", binlog_api_url, - "get_segment_file", request.remote_tablet_id, remote_rowset_id, segment_index); - uint64_t segment_file_size; - auto get_segment_file_size_cb = [&get_segment_file_size_url, - &segment_file_size](HttpClient* client) { - RETURN_IF_ERROR(client->init(get_segment_file_size_url)); - client->set_timeout_ms(kMaxTimeoutMs); - RETURN_IF_ERROR(client->head()); - return client->get_content_length(&segment_file_size); - }; - - status = HttpClient::execute_with_retry(max_retry, 1, get_segment_file_size_cb); + if (is_async) { + status = _ingest_binlog_workers->submit_func(std::move(ingest_binlog_func)); if (!status.ok()) { - LOG(WARNING) << "failed to get segment file size from " << get_segment_file_size_url - << ", status=" << status.to_string(); status.to_thrift(&tstatus); return; } - - segment_file_sizes.push_back(segment_file_size); - segment_file_urls.push_back(std::move(get_segment_file_size_url)); - } - - // Step 5.2: check data capacity - uint64_t total_size = std::accumulate(segment_file_sizes.begin(), segment_file_sizes.end(), 0); - if (!local_tablet->can_add_binlog(total_size)) { - LOG(WARNING) << "failed to add binlog, no enough space, total_size=" << total_size - << ", tablet=" << local_tablet->tablet_id(); - status = Status::InternalError("no enough space"); - status.to_thrift(&tstatus); - return; + } else { + ingest_binlog_func(); } +} - // Step 5.3: get all segment files - for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { - auto segment_file_size = segment_file_sizes[segment_index]; - auto get_segment_file_url = segment_file_urls[segment_index]; - - uint64_t estimate_timeout = - segment_file_size / config::download_low_speed_limit_kbps / 1024; - if (estimate_timeout < config::download_low_speed_time) { - estimate_timeout = config::download_low_speed_time; - } - - auto local_segment_path = BetaRowset::segment_file_path( - local_tablet->tablet_path(), rowset_meta->rowset_id(), segment_index); - LOG(INFO) << fmt::format("download segment file from {} to {}", get_segment_file_url, - local_segment_path); - auto get_segment_file_cb = [&get_segment_file_url, &local_segment_path, segment_file_size, - estimate_timeout](HttpClient* client) { - RETURN_IF_ERROR(client->init(get_segment_file_url)); - client->set_timeout_ms(estimate_timeout * 1000); - RETURN_IF_ERROR(client->download(local_segment_path)); +void BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result, + const TQueryIngestBinlogRequest& request) { + LOG(INFO) << "query ingest binlog. request: " << apache::thrift::ThriftDebugString(request); - std::error_code ec; - // Check file length - uint64_t local_file_size = std::filesystem::file_size(local_segment_path, ec); - if (ec) { - LOG(WARNING) << "download file error" << ec.message(); - return Status::IOError("can't retrive file_size of {}, due to {}", - local_segment_path, ec.message()); - } - if (local_file_size != segment_file_size) { - LOG(WARNING) << "download file length error" - << ", get_segment_file_url=" << get_segment_file_url - << ", file_size=" << segment_file_size - << ", local_file_size=" << local_file_size; - return Status::InternalError("downloaded file size is not equal"); - } - chmod(local_segment_path.c_str(), S_IRUSR | S_IWUSR); - return Status::OK(); - }; + auto set_result = [&](TIngestBinlogStatus::type status, std::string error_msg) { + result.__set_status(status); + result.__set_err_msg(std::move(error_msg)); + }; - auto status = HttpClient::execute_with_retry(max_retry, 1, get_segment_file_cb); - if (!status.ok()) { - LOG(WARNING) << "failed to get segment file from " << get_segment_file_url - << ", status=" << status.to_string(); - status.to_thrift(&tstatus); - return; - } + /// Check args: txn_id, partition_id, tablet_id, load_id + if (!request.__isset.txn_id) { + auto error_msg = "txn_id is empty"; + LOG(WARNING) << error_msg; + set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg); + return; } - - // Step 6: create rowset && calculate delete bitmap && commit - // Step 6.1: create rowset - RowsetSharedPtr rowset; - status = RowsetFactory::create_rowset(local_tablet->tablet_schema(), - local_tablet->tablet_path(), rowset_meta, &rowset); - - if (!status) { - LOG(WARNING) << "failed to create rowset from rowset meta for remote tablet" - << ". rowset_id: " << rowset_meta_pb.rowset_id() - << ", rowset_type: " << rowset_meta_pb.rowset_type() - << ", remote_tablet_id=" << rowset_meta_pb.tablet_id() << ", txn_id=" << txn_id - << ", status=" << status.to_string(); - status.to_thrift(&tstatus); + if (!request.__isset.partition_id) { + auto error_msg = "partition_id is empty"; + LOG(WARNING) << error_msg; + set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg); return; } - - // Step 6.2 calculate delete bitmap before commit - auto calc_delete_bitmap_token = - StorageEngine::instance()->calc_delete_bitmap_executor()->create_token(); - DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(local_tablet_id); - RowsetIdUnorderedSet pre_rowset_ids; - if (local_tablet->enable_unique_key_merge_on_write()) { - auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get()); - std::vector<segment_v2::SegmentSharedPtr> segments; - status = beta_rowset->load_segments(&segments); - if (!status) { - LOG(WARNING) << "failed to load segments from rowset" - << ". rowset_id: " << beta_rowset->rowset_id() << ", txn_id=" << txn_id - << ", status=" << status.to_string(); - status.to_thrift(&tstatus); - return; - } - if (segments.size() > 1) { - // calculate delete bitmap between segments - status = local_tablet->calc_delete_bitmap_between_segments(rowset, segments, - delete_bitmap); - if (!status) { - LOG(WARNING) << "failed to calculate delete bitmap" - << ". tablet_id: " << local_tablet->tablet_id() - << ". rowset_id: " << rowset->rowset_id() << ", txn_id=" << txn_id - << ", status=" << status.to_string(); - status.to_thrift(&tstatus); - return; - } - } - - static_cast<void>(local_tablet->commit_phase_update_delete_bitmap( - rowset, pre_rowset_ids, delete_bitmap, segments, txn_id, - calc_delete_bitmap_token.get(), nullptr)); - static_cast<void>(calc_delete_bitmap_token->wait()); + if (!request.__isset.tablet_id) { + auto error_msg = "tablet_id is empty"; + LOG(WARNING) << error_msg; + set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg); + return; } - - // Step 6.3: commit txn - Status commit_txn_status = StorageEngine::instance()->txn_manager()->commit_txn( - local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(), - rowset_meta->txn_id(), rowset_meta->tablet_id(), local_tablet->tablet_uid(), - rowset_meta->load_id(), rowset, false); - if (!commit_txn_status && !commit_txn_status.is<ErrorCode::PUSH_TRANSACTION_ALREADY_EXIST>()) { - auto err_msg = fmt::format( - "failed to commit txn for remote tablet. rowset_id: {}, remote_tablet_id={}, " - "txn_id={}, status={}", - rowset_meta->rowset_id().to_string(), rowset_meta->tablet_id(), - rowset_meta->txn_id(), commit_txn_status.to_string()); - LOG(WARNING) << err_msg; - set_tstatus(TStatusCode::RUNTIME_ERROR, std::move(err_msg)); + if (!request.__isset.load_id) { + auto error_msg = "load_id is empty"; + LOG(WARNING) << error_msg; + set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg); return; } - if (local_tablet->enable_unique_key_merge_on_write()) { - StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap( - partition_id, txn_id, local_tablet_id, local_tablet->tablet_uid(), true, - delete_bitmap, pre_rowset_ids, nullptr); + auto partition_id = request.partition_id; + auto txn_id = request.txn_id; + auto tablet_id = request.tablet_id; + + // Step 1: get local tablet + auto local_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id); + if (local_tablet == nullptr) { + auto error_msg = fmt::format("tablet {} not found", tablet_id); + LOG(WARNING) << error_msg; + set_result(TIngestBinlogStatus::NOT_FOUND, std::move(error_msg)); + return; } - tstatus.__set_status_code(TStatusCode::OK); + // Step 2: get txn state + auto tablet_uid = local_tablet->tablet_uid(); + auto txn_state = StorageEngine::instance()->txn_manager()->get_txn_state(partition_id, txn_id, + tablet_id, tablet_uid); + switch (txn_state) { + case TxnState::NOT_FOUND: + result.__set_status(TIngestBinlogStatus::NOT_FOUND); + break; + case TxnState::PREPARED: + result.__set_status(TIngestBinlogStatus::DOING); + break; + case TxnState::COMMITTED: + result.__set_status(TIngestBinlogStatus::OK); + break; + case TxnState::ROLLEDBACK: + result.__set_status(TIngestBinlogStatus::FAILED); + break; + case TxnState::ABORTED: + result.__set_status(TIngestBinlogStatus::FAILED); + break; + case TxnState::DELETED: + result.__set_status(TIngestBinlogStatus::FAILED); + break; + } } } // namespace doris diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index 8ad55e43e6e..09a79a68bf0 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -58,6 +58,7 @@ class TTransmitDataParams; class TUniqueId; class TIngestBinlogRequest; class TIngestBinlogResult; +class ThreadPool; // This class just forward rpc for actual handler // make this class because we can bind multiple service on single point @@ -137,10 +138,14 @@ public: void ingest_binlog(TIngestBinlogResult& result, const TIngestBinlogRequest& request) override; + void query_ingest_binlog(TQueryIngestBinlogResult& result, + const TQueryIngestBinlogRequest& request) override; + private: Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params); ExecEnv* _exec_env; std::unique_ptr<AgentServer> _agent_server; + std::unique_ptr<ThreadPool> _ingest_binlog_workers; }; } // namespace doris diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 41578d976dd..c4f24cfdf8f 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -17,6 +17,9 @@ #include "service/http_service.h" +#include <event2/bufferevent.h> +#include <event2/http.h> + #include <algorithm> #include <string> #include <vector> @@ -59,6 +62,30 @@ #include "util/doris_metrics.h" namespace doris { +namespace { +std::shared_ptr<bufferevent_rate_limit_group> get_rate_limit_group(event_base* event_base) { + auto rate_limit = config::download_binlog_rate_limit_kbs; + if (rate_limit <= 0) { + return nullptr; + } + + auto max_value = std::numeric_limits<int32_t>::max() / 1024 * 10; + if (rate_limit > max_value) { + LOG(WARNING) << "rate limit is too large, set to max value."; + rate_limit = max_value; + } + struct timeval cfg_tick = {0, 100 * 1000}; // 100ms + rate_limit = rate_limit / 10 * 1024; // convert to KB/S + + auto token_bucket = std::unique_ptr<ev_token_bucket_cfg, decltype(&ev_token_bucket_cfg_free)>( + ev_token_bucket_cfg_new(rate_limit, rate_limit * 2, rate_limit, rate_limit * 2, + &cfg_tick), + ev_token_bucket_cfg_free); + return std::shared_ptr<bufferevent_rate_limit_group>( + bufferevent_rate_limit_group_new(event_base, token_bucket.get()), + bufferevent_rate_limit_group_free); +} +} // namespace HttpService::HttpService(ExecEnv* env, int port, int num_threads) : _env(env), @@ -72,6 +99,9 @@ HttpService::~HttpService() { Status HttpService::start() { add_default_path_handlers(_web_page_handler.get()); + auto event_base = _ev_http_server->get_event_bases()[0]; + _rate_limit_group = get_rate_limit_group(event_base.get()); + // register load StreamLoadAction* streamload_action = _pool.add(new StreamLoadAction(_env)); _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_load", @@ -93,18 +123,19 @@ Status HttpService::start() { for (auto& path : _env->store_paths()) { allow_paths.emplace_back(path.path); } - DownloadAction* download_action = _pool.add(new DownloadAction(_env, allow_paths)); + DownloadAction* download_action = _pool.add(new DownloadAction(_env, nullptr, allow_paths)); _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_download_load", download_action); _ev_http_server->register_handler(HttpMethod::GET, "/api/_download_load", download_action); - DownloadAction* tablet_download_action = _pool.add(new DownloadAction(_env, allow_paths)); + DownloadAction* tablet_download_action = + _pool.add(new DownloadAction(_env, _rate_limit_group, allow_paths)); _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_tablet/_download", tablet_download_action); _ev_http_server->register_handler(HttpMethod::GET, "/api/_tablet/_download", tablet_download_action); if (config::enable_single_replica_load) { DownloadAction* single_replica_download_action = _pool.add(new DownloadAction( - _env, allow_paths, config::single_replica_load_download_num_workers)); + _env, nullptr, allow_paths, config::single_replica_load_download_num_workers)); _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_single_replica/_download", single_replica_download_action); _ev_http_server->register_handler(HttpMethod::GET, "/api/_single_replica/_download", @@ -118,7 +149,8 @@ Status HttpService::start() { _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_load_error_log", error_log_download_action); - DownloadBinlogAction* download_binlog_action = _pool.add(new DownloadBinlogAction(_env)); + DownloadBinlogAction* download_binlog_action = + _pool.add(new DownloadBinlogAction(_env, _rate_limit_group)); _ev_http_server->register_handler(HttpMethod::GET, "/api/_binlog/_download", download_binlog_action); _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_binlog/_download", diff --git a/be/src/service/http_service.h b/be/src/service/http_service.h index 46b5b3c5b3c..eeaf9516594 100644 --- a/be/src/service/http_service.h +++ b/be/src/service/http_service.h @@ -22,6 +22,8 @@ #include "common/object_pool.h" #include "common/status.h" +struct bufferevent_rate_limit_group; + namespace doris { class ExecEnv; @@ -47,6 +49,8 @@ private: std::unique_ptr<EvHttpServer> _ev_http_server; std::unique_ptr<WebPageHandler> _web_page_handler; + std::shared_ptr<bufferevent_rate_limit_group> _rate_limit_group {nullptr}; + bool stopped = false; }; diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java index a2f3867de2b..ba66d07ec6b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java @@ -34,6 +34,8 @@ import org.apache.doris.thrift.TIngestBinlogResult; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPublishTopicRequest; import org.apache.doris.thrift.TPublishTopicResult; +import org.apache.doris.thrift.TQueryIngestBinlogRequest; +import org.apache.doris.thrift.TQueryIngestBinlogResult; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.thrift.TScanBatchResult; import org.apache.doris.thrift.TScanCloseParams; @@ -229,6 +231,12 @@ public class GenericPoolTest { public TIngestBinlogResult ingestBinlog(TIngestBinlogRequest ingestBinlogRequest) throws TException { return null; } + + @Override + public TQueryIngestBinlogResult queryIngestBinlog(TQueryIngestBinlogRequest queryIngestBinlogRequest) + throws TException { + return null; + } } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 058e356a325..860b58a47a7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -52,6 +52,8 @@ import org.apache.doris.thrift.TMasterInfo; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPublishTopicRequest; import org.apache.doris.thrift.TPublishTopicResult; +import org.apache.doris.thrift.TQueryIngestBinlogRequest; +import org.apache.doris.thrift.TQueryIngestBinlogResult; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.thrift.TScanBatchResult; import org.apache.doris.thrift.TScanCloseParams; @@ -375,6 +377,12 @@ public class MockedBackendFactory { public TIngestBinlogResult ingestBinlog(TIngestBinlogRequest ingestBinlogRequest) throws TException { return null; } + + @Override + public TQueryIngestBinlogResult queryIngestBinlog(TQueryIngestBinlogRequest queryIngestBinlogRequest) + throws TException { + return null; + } } // The default Brpc service. diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index a7a9c50aed2..d35d6166d3b 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -136,6 +136,28 @@ struct TIngestBinlogRequest { struct TIngestBinlogResult { 1: optional Status.TStatus status; + 2: optional bool is_async; +} + +struct TQueryIngestBinlogRequest { + 1: optional i64 txn_id; + 2: optional i64 partition_id; + 3: optional i64 tablet_id; + 4: optional Types.TUniqueId load_id; +} + +enum TIngestBinlogStatus { + ANALYSIS_ERROR, + UNKNOWN, + NOT_FOUND, + OK, + FAILED, + DOING +} + +struct TQueryIngestBinlogResult { + 1: optional TIngestBinlogStatus status; + 2: optional string err_msg; } enum TTopicInfoType { @@ -211,6 +233,7 @@ service BackendService { TCheckStorageFormatResult check_storage_format(); TIngestBinlogResult ingest_binlog(1: TIngestBinlogRequest ingest_binlog_request); + TQueryIngestBinlogResult query_ingest_binlog(1: TQueryIngestBinlogRequest query_ingest_binlog_request); TPublishTopicResult publish_topic_info(1:TPublishTopicRequest topic_request); } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy index 493c4daa493..62b257b1bc8 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy @@ -72,7 +72,6 @@ class Syncer { } private Boolean checkBinlog(TBinlog binlog, String table, Boolean update) { - // step 1: check binlog availability if (binlog == null) { return false @@ -735,6 +734,7 @@ class Syncer { if (!binlogRecords.contains(srcPartition.key)) { continue } + Iterator srcTabletIter = srcPartition.value.tabletMeta.iterator() Iterator tarTabletIter = tarPartition.value.tabletMeta.iterator() @@ -771,6 +771,7 @@ class Syncer { logger.info("request -> ${request}") TIngestBinlogResult result = tarClient.client.ingestBinlog(request) if (!checkIngestBinlog(result)) { + logger.error("Ingest binlog error! result: ${result}") return false } diff --git a/regression-test/suites/ccr_syncer_p0/test_ingest_binlog.groovy b/regression-test/suites/ccr_syncer_p0/test_ingest_binlog.groovy index cef48715aeb..3004e344cc9 100644 --- a/regression-test/suites/ccr_syncer_p0/test_ingest_binlog.groovy +++ b/regression-test/suites/ccr_syncer_p0/test_ingest_binlog.groovy @@ -22,6 +22,7 @@ suite("test_ingest_binlog") { logger.info("fe enable_feature_binlog is false, skip case test_ingest_binlog") return } + def tableName = "tbl_ingest_binlog" def insert_num = 5 def test_num = 0 @@ -102,6 +103,7 @@ suite("test_ingest_binlog") { logger.info("=== Test 2.2: Wrong binlog version case ===") // -1 means use the number of syncer.context // Boolean ingestBinlog(long fakePartitionId = -1, long fakeVersion = -1) + // use fakeVersion = 1, 1 is doris be talet first version, so no binlog, only http error assertTrue(syncer.ingestBinlog(-1, 1) == false) @@ -120,4 +122,4 @@ suite("test_ingest_binlog") { // End Test 2 syncer.closeBackendClients() -} \ No newline at end of file +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org