This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new b7680fd95e2 [fix](broker-read) refactor broker reading process to avoid null broker connection (#26050) (#26349) b7680fd95e2 is described below commit b7680fd95e2886737b1a08ccbeec525c38e36d56 Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com> AuthorDate: Fri Nov 3 12:28:45 2023 +0800 [fix](broker-read) refactor broker reading process to avoid null broker connection (#26050) (#26349) --- be/src/io/fs/broker_file_reader.cpp | 81 +++------------------- be/src/io/fs/broker_file_reader.h | 11 ++- be/src/io/fs/broker_file_system.cpp | 135 ++++++++++++++++++++++++++---------- be/src/io/fs/broker_file_system.h | 7 +- be/src/runtime/client_cache.h | 2 + 5 files changed, 121 insertions(+), 115 deletions(-) diff --git a/be/src/io/fs/broker_file_reader.cpp b/be/src/io/fs/broker_file_reader.cpp index c48433ba23a..fa21e754342 100644 --- a/be/src/io/fs/broker_file_reader.cpp +++ b/be/src/io/fs/broker_file_reader.cpp @@ -35,19 +35,16 @@ #include "io/fs/broker_file_system.h" #include "util/doris_metrics.h" -namespace doris { -namespace io { +namespace doris::io { class IOContext; -BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, const Path& path, - size_t file_size, TBrokerFD fd, - std::shared_ptr<BrokerFileSystem> fs) - : _path(path), +BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size, + TBrokerFD fd, std::shared_ptr<BrokerFileSystem> fs) + : _path(std::move(path)), _file_size(file_size), _broker_addr(broker_addr), _fd(fd), _fs(std::move(fs)) { - _fs->get_client(&_client); DorisMetrics::instance()->broker_file_open_reading->increment(1); DorisMetrics::instance()->broker_file_reader_total->increment(1); } @@ -59,32 +56,7 @@ BrokerFileReader::~BrokerFileReader() { Status BrokerFileReader::close() { bool expected = false; if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { - TBrokerCloseReaderRequest request; - request.__set_version(TBrokerVersion::VERSION_ONE); - request.__set_fd(_fd); - - TBrokerOperationStatus response; - try { - try { - (*_client)->closeReader(response, request); - } catch (apache::thrift::transport::TTransportException&) { - std::this_thread::sleep_for(std::chrono::seconds(1)); - RETURN_IF_ERROR((*_client).reopen()); - (*_client)->closeReader(response, request); - } - } catch (apache::thrift::TException& e) { - std::stringstream ss; - ss << "Close broker reader failed, broker:" << _broker_addr << " failed:" << e.what(); - return Status::RpcError(ss.str()); - } - - if (response.statusCode != TBrokerOperationStatusCode::OK) { - std::stringstream ss; - ss << "close broker reader failed, broker:" << _broker_addr - << " failed:" << response.message; - return Status::InternalError(ss.str()); - } - + RETURN_IF_ERROR(_fs->close_file(_fd)); DorisMetrics::instance()->broker_file_open_reading->increment(-1); } return Status::OK(); @@ -100,45 +72,12 @@ Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes return Status::OK(); } - TBrokerPReadRequest request; - request.__set_version(TBrokerVersion::VERSION_ONE); - request.__set_fd(_fd); - request.__set_offset(offset); - request.__set_length(bytes_req); - - TBrokerReadResponse response; - try { - VLOG_RPC << "send pread request to broker:" << _broker_addr << " position:" << offset - << ", read bytes length:" << bytes_req; - try { - (*_client)->pread(response, request); - } catch (apache::thrift::transport::TTransportException& e) { - std::this_thread::sleep_for(std::chrono::seconds(1)); - RETURN_IF_ERROR((*_client).reopen()); - LOG(INFO) << "retry reading from broker: " << _broker_addr << ". reason: " << e.what(); - (*_client)->pread(response, request); - } - } catch (apache::thrift::TException& e) { - std::stringstream ss; - ss << "Open broker reader failed, broker:" << _broker_addr << " failed:" << e.what(); - return Status::RpcError(ss.str()); - } - - if (response.opStatus.statusCode == TBrokerOperationStatusCode::END_OF_FILE) { - // read the end of broker's file - *bytes_read = 0; - return Status::OK(); - } else if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) { - std::stringstream ss; - ss << "Open broker reader failed, broker:" << _broker_addr - << " failed:" << response.opStatus.message; - return Status::InternalError(ss.str()); - } + std::string data; + RETURN_IF_ERROR(_fs->read_file(_fd, offset, bytes_req, &data)); - *bytes_read = response.data.size(); - memcpy(to, response.data.data(), *bytes_read); + *bytes_read = data.size(); + memcpy(to, data.data(), *bytes_read); return Status::OK(); } -} // namespace io -} // namespace doris +} // namespace doris::io diff --git a/be/src/io/fs/broker_file_reader.h b/be/src/io/fs/broker_file_reader.h index a7f126ba0db..3f51fb979c3 100644 --- a/be/src/io/fs/broker_file_reader.h +++ b/be/src/io/fs/broker_file_reader.h @@ -32,15 +32,14 @@ #include "runtime/client_cache.h" #include "util/slice.h" -namespace doris { -namespace io { +namespace doris::io { class IOContext; class BrokerFileReader : public FileReader { public: - BrokerFileReader(const TNetworkAddress& broker_addr, const Path& path, size_t file_size, - TBrokerFD fd, std::shared_ptr<BrokerFileSystem> fs); + BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size, TBrokerFD fd, + std::shared_ptr<BrokerFileSystem> fs); ~BrokerFileReader() override; @@ -66,8 +65,6 @@ private: TBrokerFD _fd; std::shared_ptr<BrokerFileSystem> _fs; - std::shared_ptr<BrokerServiceConnection> _client; std::atomic<bool> _closed = false; }; -} // namespace io -} // namespace doris +} // namespace doris::io diff --git a/be/src/io/fs/broker_file_system.cpp b/be/src/io/fs/broker_file_system.cpp index 3578fb91eaa..a3c93c04a7a 100644 --- a/be/src/io/fs/broker_file_system.cpp +++ b/be/src/io/fs/broker_file_system.cpp @@ -45,8 +45,7 @@ #include "runtime/exec_env.h" #include "util/slice.h" -namespace doris { -namespace io { +namespace doris::io { #ifdef BE_TEST inline BrokerServiceClientCache* client_cache() { @@ -70,8 +69,8 @@ inline const std::string& client_id(const TNetworkAddress& addr) { #ifndef CHECK_BROKER_CLIENT #define CHECK_BROKER_CLIENT(client) \ - if (!client) { \ - return Status::IOError("init Broker client error"); \ + if (!client || !client->is_alive()) { \ + return Status::IOError("connect to broker failed"); \ } #endif @@ -90,8 +89,8 @@ BrokerFileSystem::BrokerFileSystem(const TNetworkAddress& broker_addr, Status BrokerFileSystem::connect_impl() { Status status = Status::OK(); - _client.reset(new BrokerServiceConnection(client_cache(), _broker_addr, - config::thrift_rpc_timeout_ms, &status)); + _connection = std::make_unique<BrokerServiceConnection>(client_cache(), _broker_addr, + config::thrift_rpc_timeout_ms, &status); return status; } @@ -109,7 +108,7 @@ Status BrokerFileSystem::open_file_internal(const FileDescription& fd, const Pat RETURN_IF_ERROR(file_size_impl(abs_path, &fsize)); } - CHECK_BROKER_CLIENT(_client); + CHECK_BROKER_CLIENT(_connection); TBrokerOpenReaderRequest request; request.__set_version(TBrokerVersion::VERSION_ONE); request.__set_path(abs_path); @@ -121,11 +120,11 @@ Status BrokerFileSystem::open_file_internal(const FileDescription& fd, const Pat try { Status status; try { - (*_client)->openReader(*response, request); + (*_connection)->openReader(*response, request); } catch (apache::thrift::transport::TTransportException&) { std::this_thread::sleep_for(std::chrono::seconds(1)); - RETURN_IF_ERROR((*_client).reopen()); - (*_client)->openReader(*response, request); + RETURN_IF_ERROR((*_connection).reopen()); + (*_connection)->openReader(*response, request); } } catch (apache::thrift::TException& e) { return Status::RpcError("failed to open file {}: {}", abs_path.native(), @@ -147,7 +146,7 @@ Status BrokerFileSystem::create_directory_impl(const Path& /*path*/, bool /*fail } Status BrokerFileSystem::delete_file_impl(const Path& file) { - CHECK_BROKER_CLIENT(_client); + CHECK_BROKER_CLIENT(_connection); try { // rm file from remote path TBrokerDeletePathRequest del_req; @@ -157,10 +156,10 @@ Status BrokerFileSystem::delete_file_impl(const Path& file) { del_req.__set_properties(_broker_prop); try { - (*_client)->deletePath(del_rep, del_req); + (*_connection)->deletePath(del_rep, del_req); } catch (apache::thrift::transport::TTransportException&) { - RETURN_IF_ERROR((*_client).reopen()); - (*_client)->deletePath(del_rep, del_req); + RETURN_IF_ERROR((*_connection).reopen()); + (*_connection)->deletePath(del_rep, del_req); } if (del_rep.statusCode == TBrokerOperationStatusCode::OK) { @@ -187,7 +186,7 @@ Status BrokerFileSystem::batch_delete_impl(const std::vector<Path>& files) { } Status BrokerFileSystem::exists_impl(const Path& path, bool* res) const { - CHECK_BROKER_CLIENT(_client); + CHECK_BROKER_CLIENT(_connection); *res = false; try { TBrokerCheckPathExistRequest check_req; @@ -197,10 +196,10 @@ Status BrokerFileSystem::exists_impl(const Path& path, bool* res) const { check_req.__set_properties(_broker_prop); try { - (*_client)->checkPathExist(check_rep, check_req); + (*_connection)->checkPathExist(check_rep, check_req); } catch (apache::thrift::transport::TTransportException&) { - RETURN_IF_ERROR((*_client).reopen()); - (*_client)->checkPathExist(check_rep, check_req); + RETURN_IF_ERROR((*_connection).reopen()); + (*_connection)->checkPathExist(check_rep, check_req); } if (check_rep.opStatus.statusCode != TBrokerOperationStatusCode::OK) { @@ -220,7 +219,7 @@ Status BrokerFileSystem::exists_impl(const Path& path, bool* res) const { } Status BrokerFileSystem::file_size_impl(const Path& path, int64_t* file_size) const { - CHECK_BROKER_CLIENT(_client); + CHECK_BROKER_CLIENT(_connection); try { TBrokerFileSizeRequest req; req.__set_version(TBrokerVersion::VERSION_ONE); @@ -229,10 +228,10 @@ Status BrokerFileSystem::file_size_impl(const Path& path, int64_t* file_size) co TBrokerFileSizeResponse resp; try { - (*_client)->fileSize(resp, req); + (*_connection)->fileSize(resp, req); } catch (apache::thrift::transport::TTransportException&) { - RETURN_IF_ERROR((*_client).reopen()); - (*_client)->fileSize(resp, req); + RETURN_IF_ERROR((*_connection).reopen()); + (*_connection)->fileSize(resp, req); } if (resp.opStatus.statusCode != TBrokerOperationStatusCode::OK) { @@ -257,7 +256,7 @@ Status BrokerFileSystem::list_impl(const Path& dir, bool only_file, std::vector< if (!(*exists)) { return Status::OK(); } - CHECK_BROKER_CLIENT(_client); + CHECK_BROKER_CLIENT(_connection); Status status = Status::OK(); try { // get existing files from remote path @@ -270,10 +269,10 @@ Status BrokerFileSystem::list_impl(const Path& dir, bool only_file, std::vector< list_req.__set_fileNameOnly(true); // we only need file name, not abs path try { - (*_client)->listPath(list_rep, list_req); + (*_connection)->listPath(list_rep, list_req); } catch (apache::thrift::transport::TTransportException&) { - RETURN_IF_ERROR((*_client).reopen()); - (*_client)->listPath(list_rep, list_req); + RETURN_IF_ERROR((*_connection).reopen()); + (*_connection)->listPath(list_rep, list_req); } if (list_rep.opStatus.statusCode == TBrokerOperationStatusCode::FILE_NOT_FOUND) { @@ -310,7 +309,7 @@ Status BrokerFileSystem::list_impl(const Path& dir, bool only_file, std::vector< } Status BrokerFileSystem::rename_impl(const Path& orig_name, const Path& new_name) { - CHECK_BROKER_CLIENT(_client); + CHECK_BROKER_CLIENT(_connection); try { TBrokerOperationStatus op_status; TBrokerRenamePathRequest rename_req; @@ -320,10 +319,10 @@ Status BrokerFileSystem::rename_impl(const Path& orig_name, const Path& new_name rename_req.__set_properties(_broker_prop); try { - (*_client)->renamePath(op_status, rename_req); + (*_connection)->renamePath(op_status, rename_req); } catch (apache::thrift::transport::TTransportException&) { - RETURN_IF_ERROR((*_client).reopen()); - (*_client)->renamePath(op_status, rename_req); + RETURN_IF_ERROR((*_connection).reopen()); + (*_connection)->renamePath(op_status, rename_req); } if (op_status.statusCode != TBrokerOperationStatusCode::OK) { @@ -470,9 +469,76 @@ Status BrokerFileSystem::direct_download_impl(const Path& remote_file, std::stri return Status::OK(); } -Status BrokerFileSystem::get_client(std::shared_ptr<BrokerServiceConnection>* client) const { - CHECK_BROKER_CLIENT(_client); - *client = _client; +Status BrokerFileSystem::read_file(const TBrokerFD& fd, size_t offset, size_t bytes_req, + std::string* data) const { + if (data == nullptr) { + return Status::InvalidArgument("data should be not null"); + } + CHECK_BROKER_CLIENT(_connection); + TBrokerPReadRequest request; + request.__set_version(TBrokerVersion::VERSION_ONE); + request.__set_fd(fd); + request.__set_offset(offset); + request.__set_length(bytes_req); + + TBrokerReadResponse response; + try { + VLOG_RPC << "send pread request to broker:" << _broker_addr << " position:" << offset + << ", read bytes length:" << bytes_req; + try { + (*_connection)->pread(response, request); + } catch (apache::thrift::transport::TTransportException& e) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + RETURN_IF_ERROR((*_connection).reopen()); + LOG(INFO) << "retry reading from broker: " << _broker_addr << ". reason: " << e.what(); + (*_connection)->pread(response, request); + } + } catch (apache::thrift::TException& e) { + std::stringstream ss; + ss << "read broker file failed, broker:" << _broker_addr << " failed:" << e.what(); + return Status::RpcError(ss.str()); + } + + if (response.opStatus.statusCode == TBrokerOperationStatusCode::END_OF_FILE) { + // read the end of broker's file + return Status::OK(); + } + if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) { + std::stringstream ss; + ss << "Open broker reader failed, broker:" << _broker_addr + << " failed:" << response.opStatus.message; + return Status::InternalError(ss.str()); + } + *data = std::move(response.data); + return Status::OK(); +} + +Status BrokerFileSystem::close_file(const TBrokerFD& fd) const { + CHECK_BROKER_CLIENT(_connection); + TBrokerCloseReaderRequest request; + request.__set_version(TBrokerVersion::VERSION_ONE); + request.__set_fd(fd); + + TBrokerOperationStatus response; + try { + try { + (*_connection)->closeReader(response, request); + } catch (apache::thrift::transport::TTransportException&) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + RETURN_IF_ERROR((*_connection).reopen()); + (*_connection)->closeReader(response, request); + } + } catch (apache::thrift::TException& e) { + std::stringstream ss; + ss << "close broker file failed, broker:" << _broker_addr << " failed:" << e.what(); + return Status::RpcError(ss.str()); + } + + if (response.statusCode != TBrokerOperationStatusCode::OK) { + std::stringstream ss; + ss << "close broker file failed, broker:" << _broker_addr << " failed:" << response.message; + return Status::InternalError(ss.str()); + } return Status::OK(); } @@ -480,5 +546,4 @@ std::string BrokerFileSystem::error_msg(const std::string& err) const { return fmt::format("({}:{}), {}", _broker_addr.hostname, _broker_addr.port, err); } -} // namespace io -} // namespace doris +} // namespace doris::io diff --git a/be/src/io/fs/broker_file_system.h b/be/src/io/fs/broker_file_system.h index 1e29b10a744..49a2ced67d9 100644 --- a/be/src/io/fs/broker_file_system.h +++ b/be/src/io/fs/broker_file_system.h @@ -19,6 +19,7 @@ #include <stdint.h> +#include <cstddef> #include <map> #include <memory> #include <string> @@ -44,7 +45,9 @@ public: ~BrokerFileSystem() override = default; - Status get_client(std::shared_ptr<BrokerServiceConnection>* client) const; + Status read_file(const TBrokerFD& fd, size_t offset, size_t bytes_req, std::string* data) const; + + Status close_file(const TBrokerFD& fd) const; protected: Status connect_impl() override; @@ -80,7 +83,7 @@ private: const TNetworkAddress& _broker_addr; const std::map<std::string, std::string>& _broker_prop; - std::shared_ptr<BrokerServiceConnection> _client; + std::unique_ptr<BrokerServiceConnection> _connection; }; } // namespace io } // namespace doris diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h index 7ba5cafabea..ff45055ca65 100644 --- a/be/src/runtime/client_cache.h +++ b/be/src/runtime/client_cache.h @@ -187,6 +187,8 @@ public: Status reopen() { return _client_cache->reopen_client(&_client, 0); } + inline bool is_alive() { return _client != nullptr; } + T* operator->() const { return _client; } private: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org