This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new b7680fd95e2 [fix](broker-read) refactor broker reading process to 
avoid null broker connection (#26050) (#26349)
b7680fd95e2 is described below

commit b7680fd95e2886737b1a08ccbeec525c38e36d56
Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com>
AuthorDate: Fri Nov 3 12:28:45 2023 +0800

    [fix](broker-read) refactor broker reading process to avoid null broker 
connection (#26050) (#26349)
---
 be/src/io/fs/broker_file_reader.cpp |  81 +++-------------------
 be/src/io/fs/broker_file_reader.h   |  11 ++-
 be/src/io/fs/broker_file_system.cpp | 135 ++++++++++++++++++++++++++----------
 be/src/io/fs/broker_file_system.h   |   7 +-
 be/src/runtime/client_cache.h       |   2 +
 5 files changed, 121 insertions(+), 115 deletions(-)

diff --git a/be/src/io/fs/broker_file_reader.cpp 
b/be/src/io/fs/broker_file_reader.cpp
index c48433ba23a..fa21e754342 100644
--- a/be/src/io/fs/broker_file_reader.cpp
+++ b/be/src/io/fs/broker_file_reader.cpp
@@ -35,19 +35,16 @@
 #include "io/fs/broker_file_system.h"
 #include "util/doris_metrics.h"
 
-namespace doris {
-namespace io {
+namespace doris::io {
 class IOContext;
 
-BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, const 
Path& path,
-                                   size_t file_size, TBrokerFD fd,
-                                   std::shared_ptr<BrokerFileSystem> fs)
-        : _path(path),
+BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, Path 
path, size_t file_size,
+                                   TBrokerFD fd, 
std::shared_ptr<BrokerFileSystem> fs)
+        : _path(std::move(path)),
           _file_size(file_size),
           _broker_addr(broker_addr),
           _fd(fd),
           _fs(std::move(fs)) {
-    _fs->get_client(&_client);
     DorisMetrics::instance()->broker_file_open_reading->increment(1);
     DorisMetrics::instance()->broker_file_reader_total->increment(1);
 }
@@ -59,32 +56,7 @@ BrokerFileReader::~BrokerFileReader() {
 Status BrokerFileReader::close() {
     bool expected = false;
     if (_closed.compare_exchange_strong(expected, true, 
std::memory_order_acq_rel)) {
-        TBrokerCloseReaderRequest request;
-        request.__set_version(TBrokerVersion::VERSION_ONE);
-        request.__set_fd(_fd);
-
-        TBrokerOperationStatus response;
-        try {
-            try {
-                (*_client)->closeReader(response, request);
-            } catch (apache::thrift::transport::TTransportException&) {
-                std::this_thread::sleep_for(std::chrono::seconds(1));
-                RETURN_IF_ERROR((*_client).reopen());
-                (*_client)->closeReader(response, request);
-            }
-        } catch (apache::thrift::TException& e) {
-            std::stringstream ss;
-            ss << "Close broker reader failed, broker:" << _broker_addr << " 
failed:" << e.what();
-            return Status::RpcError(ss.str());
-        }
-
-        if (response.statusCode != TBrokerOperationStatusCode::OK) {
-            std::stringstream ss;
-            ss << "close broker reader failed, broker:" << _broker_addr
-               << " failed:" << response.message;
-            return Status::InternalError(ss.str());
-        }
-
+        RETURN_IF_ERROR(_fs->close_file(_fd));
         DorisMetrics::instance()->broker_file_open_reading->increment(-1);
     }
     return Status::OK();
@@ -100,45 +72,12 @@ Status BrokerFileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes
         return Status::OK();
     }
 
-    TBrokerPReadRequest request;
-    request.__set_version(TBrokerVersion::VERSION_ONE);
-    request.__set_fd(_fd);
-    request.__set_offset(offset);
-    request.__set_length(bytes_req);
-
-    TBrokerReadResponse response;
-    try {
-        VLOG_RPC << "send pread request to broker:" << _broker_addr << " 
position:" << offset
-                 << ", read bytes length:" << bytes_req;
-        try {
-            (*_client)->pread(response, request);
-        } catch (apache::thrift::transport::TTransportException& e) {
-            std::this_thread::sleep_for(std::chrono::seconds(1));
-            RETURN_IF_ERROR((*_client).reopen());
-            LOG(INFO) << "retry reading from broker: " << _broker_addr << ". 
reason: " << e.what();
-            (*_client)->pread(response, request);
-        }
-    } catch (apache::thrift::TException& e) {
-        std::stringstream ss;
-        ss << "Open broker reader failed, broker:" << _broker_addr << " 
failed:" << e.what();
-        return Status::RpcError(ss.str());
-    }
-
-    if (response.opStatus.statusCode == 
TBrokerOperationStatusCode::END_OF_FILE) {
-        // read the end of broker's file
-        *bytes_read = 0;
-        return Status::OK();
-    } else if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) 
{
-        std::stringstream ss;
-        ss << "Open broker reader failed, broker:" << _broker_addr
-           << " failed:" << response.opStatus.message;
-        return Status::InternalError(ss.str());
-    }
+    std::string data;
+    RETURN_IF_ERROR(_fs->read_file(_fd, offset, bytes_req, &data));
 
-    *bytes_read = response.data.size();
-    memcpy(to, response.data.data(), *bytes_read);
+    *bytes_read = data.size();
+    memcpy(to, data.data(), *bytes_read);
     return Status::OK();
 }
 
-} // namespace io
-} // namespace doris
+} // namespace doris::io
diff --git a/be/src/io/fs/broker_file_reader.h 
b/be/src/io/fs/broker_file_reader.h
index a7f126ba0db..3f51fb979c3 100644
--- a/be/src/io/fs/broker_file_reader.h
+++ b/be/src/io/fs/broker_file_reader.h
@@ -32,15 +32,14 @@
 #include "runtime/client_cache.h"
 #include "util/slice.h"
 
-namespace doris {
-namespace io {
+namespace doris::io {
 
 class IOContext;
 
 class BrokerFileReader : public FileReader {
 public:
-    BrokerFileReader(const TNetworkAddress& broker_addr, const Path& path, 
size_t file_size,
-                     TBrokerFD fd, std::shared_ptr<BrokerFileSystem> fs);
+    BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t 
file_size, TBrokerFD fd,
+                     std::shared_ptr<BrokerFileSystem> fs);
 
     ~BrokerFileReader() override;
 
@@ -66,8 +65,6 @@ private:
     TBrokerFD _fd;
 
     std::shared_ptr<BrokerFileSystem> _fs;
-    std::shared_ptr<BrokerServiceConnection> _client;
     std::atomic<bool> _closed = false;
 };
-} // namespace io
-} // namespace doris
+} // namespace doris::io
diff --git a/be/src/io/fs/broker_file_system.cpp 
b/be/src/io/fs/broker_file_system.cpp
index 3578fb91eaa..a3c93c04a7a 100644
--- a/be/src/io/fs/broker_file_system.cpp
+++ b/be/src/io/fs/broker_file_system.cpp
@@ -45,8 +45,7 @@
 #include "runtime/exec_env.h"
 #include "util/slice.h"
 
-namespace doris {
-namespace io {
+namespace doris::io {
 
 #ifdef BE_TEST
 inline BrokerServiceClientCache* client_cache() {
@@ -70,8 +69,8 @@ inline const std::string& client_id(const TNetworkAddress& 
addr) {
 
 #ifndef CHECK_BROKER_CLIENT
 #define CHECK_BROKER_CLIENT(client)                         \
-    if (!client) {                                          \
-        return Status::IOError("init Broker client error"); \
+    if (!client || !client->is_alive()) {                   \
+        return Status::IOError("connect to broker failed"); \
     }
 #endif
 
@@ -90,8 +89,8 @@ BrokerFileSystem::BrokerFileSystem(const TNetworkAddress& 
broker_addr,
 
 Status BrokerFileSystem::connect_impl() {
     Status status = Status::OK();
-    _client.reset(new BrokerServiceConnection(client_cache(), _broker_addr,
-                                              config::thrift_rpc_timeout_ms, 
&status));
+    _connection = std::make_unique<BrokerServiceConnection>(client_cache(), 
_broker_addr,
+                                                            
config::thrift_rpc_timeout_ms, &status);
     return status;
 }
 
@@ -109,7 +108,7 @@ Status BrokerFileSystem::open_file_internal(const 
FileDescription& fd, const Pat
         RETURN_IF_ERROR(file_size_impl(abs_path, &fsize));
     }
 
-    CHECK_BROKER_CLIENT(_client);
+    CHECK_BROKER_CLIENT(_connection);
     TBrokerOpenReaderRequest request;
     request.__set_version(TBrokerVersion::VERSION_ONE);
     request.__set_path(abs_path);
@@ -121,11 +120,11 @@ Status BrokerFileSystem::open_file_internal(const 
FileDescription& fd, const Pat
     try {
         Status status;
         try {
-            (*_client)->openReader(*response, request);
+            (*_connection)->openReader(*response, request);
         } catch (apache::thrift::transport::TTransportException&) {
             std::this_thread::sleep_for(std::chrono::seconds(1));
-            RETURN_IF_ERROR((*_client).reopen());
-            (*_client)->openReader(*response, request);
+            RETURN_IF_ERROR((*_connection).reopen());
+            (*_connection)->openReader(*response, request);
         }
     } catch (apache::thrift::TException& e) {
         return Status::RpcError("failed to open file {}: {}", 
abs_path.native(),
@@ -147,7 +146,7 @@ Status BrokerFileSystem::create_directory_impl(const Path& 
/*path*/, bool /*fail
 }
 
 Status BrokerFileSystem::delete_file_impl(const Path& file) {
-    CHECK_BROKER_CLIENT(_client);
+    CHECK_BROKER_CLIENT(_connection);
     try {
         // rm file from remote path
         TBrokerDeletePathRequest del_req;
@@ -157,10 +156,10 @@ Status BrokerFileSystem::delete_file_impl(const Path& 
file) {
         del_req.__set_properties(_broker_prop);
 
         try {
-            (*_client)->deletePath(del_rep, del_req);
+            (*_connection)->deletePath(del_rep, del_req);
         } catch (apache::thrift::transport::TTransportException&) {
-            RETURN_IF_ERROR((*_client).reopen());
-            (*_client)->deletePath(del_rep, del_req);
+            RETURN_IF_ERROR((*_connection).reopen());
+            (*_connection)->deletePath(del_rep, del_req);
         }
 
         if (del_rep.statusCode == TBrokerOperationStatusCode::OK) {
@@ -187,7 +186,7 @@ Status BrokerFileSystem::batch_delete_impl(const 
std::vector<Path>& files) {
 }
 
 Status BrokerFileSystem::exists_impl(const Path& path, bool* res) const {
-    CHECK_BROKER_CLIENT(_client);
+    CHECK_BROKER_CLIENT(_connection);
     *res = false;
     try {
         TBrokerCheckPathExistRequest check_req;
@@ -197,10 +196,10 @@ Status BrokerFileSystem::exists_impl(const Path& path, 
bool* res) const {
         check_req.__set_properties(_broker_prop);
 
         try {
-            (*_client)->checkPathExist(check_rep, check_req);
+            (*_connection)->checkPathExist(check_rep, check_req);
         } catch (apache::thrift::transport::TTransportException&) {
-            RETURN_IF_ERROR((*_client).reopen());
-            (*_client)->checkPathExist(check_rep, check_req);
+            RETURN_IF_ERROR((*_connection).reopen());
+            (*_connection)->checkPathExist(check_rep, check_req);
         }
 
         if (check_rep.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
@@ -220,7 +219,7 @@ Status BrokerFileSystem::exists_impl(const Path& path, 
bool* res) const {
 }
 
 Status BrokerFileSystem::file_size_impl(const Path& path, int64_t* file_size) 
const {
-    CHECK_BROKER_CLIENT(_client);
+    CHECK_BROKER_CLIENT(_connection);
     try {
         TBrokerFileSizeRequest req;
         req.__set_version(TBrokerVersion::VERSION_ONE);
@@ -229,10 +228,10 @@ Status BrokerFileSystem::file_size_impl(const Path& path, 
int64_t* file_size) co
 
         TBrokerFileSizeResponse resp;
         try {
-            (*_client)->fileSize(resp, req);
+            (*_connection)->fileSize(resp, req);
         } catch (apache::thrift::transport::TTransportException&) {
-            RETURN_IF_ERROR((*_client).reopen());
-            (*_client)->fileSize(resp, req);
+            RETURN_IF_ERROR((*_connection).reopen());
+            (*_connection)->fileSize(resp, req);
         }
 
         if (resp.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
@@ -257,7 +256,7 @@ Status BrokerFileSystem::list_impl(const Path& dir, bool 
only_file, std::vector<
     if (!(*exists)) {
         return Status::OK();
     }
-    CHECK_BROKER_CLIENT(_client);
+    CHECK_BROKER_CLIENT(_connection);
     Status status = Status::OK();
     try {
         // get existing files from remote path
@@ -270,10 +269,10 @@ Status BrokerFileSystem::list_impl(const Path& dir, bool 
only_file, std::vector<
         list_req.__set_fileNameOnly(true); // we only need file name, not abs 
path
 
         try {
-            (*_client)->listPath(list_rep, list_req);
+            (*_connection)->listPath(list_rep, list_req);
         } catch (apache::thrift::transport::TTransportException&) {
-            RETURN_IF_ERROR((*_client).reopen());
-            (*_client)->listPath(list_rep, list_req);
+            RETURN_IF_ERROR((*_connection).reopen());
+            (*_connection)->listPath(list_rep, list_req);
         }
 
         if (list_rep.opStatus.statusCode == 
TBrokerOperationStatusCode::FILE_NOT_FOUND) {
@@ -310,7 +309,7 @@ Status BrokerFileSystem::list_impl(const Path& dir, bool 
only_file, std::vector<
 }
 
 Status BrokerFileSystem::rename_impl(const Path& orig_name, const Path& 
new_name) {
-    CHECK_BROKER_CLIENT(_client);
+    CHECK_BROKER_CLIENT(_connection);
     try {
         TBrokerOperationStatus op_status;
         TBrokerRenamePathRequest rename_req;
@@ -320,10 +319,10 @@ Status BrokerFileSystem::rename_impl(const Path& 
orig_name, const Path& new_name
         rename_req.__set_properties(_broker_prop);
 
         try {
-            (*_client)->renamePath(op_status, rename_req);
+            (*_connection)->renamePath(op_status, rename_req);
         } catch (apache::thrift::transport::TTransportException&) {
-            RETURN_IF_ERROR((*_client).reopen());
-            (*_client)->renamePath(op_status, rename_req);
+            RETURN_IF_ERROR((*_connection).reopen());
+            (*_connection)->renamePath(op_status, rename_req);
         }
 
         if (op_status.statusCode != TBrokerOperationStatusCode::OK) {
@@ -470,9 +469,76 @@ Status BrokerFileSystem::direct_download_impl(const Path& 
remote_file, std::stri
     return Status::OK();
 }
 
-Status BrokerFileSystem::get_client(std::shared_ptr<BrokerServiceConnection>* 
client) const {
-    CHECK_BROKER_CLIENT(_client);
-    *client = _client;
+Status BrokerFileSystem::read_file(const TBrokerFD& fd, size_t offset, size_t 
bytes_req,
+                                   std::string* data) const {
+    if (data == nullptr) {
+        return Status::InvalidArgument("data should be not null");
+    }
+    CHECK_BROKER_CLIENT(_connection);
+    TBrokerPReadRequest request;
+    request.__set_version(TBrokerVersion::VERSION_ONE);
+    request.__set_fd(fd);
+    request.__set_offset(offset);
+    request.__set_length(bytes_req);
+
+    TBrokerReadResponse response;
+    try {
+        VLOG_RPC << "send pread request to broker:" << _broker_addr << " 
position:" << offset
+                 << ", read bytes length:" << bytes_req;
+        try {
+            (*_connection)->pread(response, request);
+        } catch (apache::thrift::transport::TTransportException& e) {
+            std::this_thread::sleep_for(std::chrono::seconds(1));
+            RETURN_IF_ERROR((*_connection).reopen());
+            LOG(INFO) << "retry reading from broker: " << _broker_addr << ". 
reason: " << e.what();
+            (*_connection)->pread(response, request);
+        }
+    } catch (apache::thrift::TException& e) {
+        std::stringstream ss;
+        ss << "read broker file failed, broker:" << _broker_addr << " failed:" 
<< e.what();
+        return Status::RpcError(ss.str());
+    }
+
+    if (response.opStatus.statusCode == 
TBrokerOperationStatusCode::END_OF_FILE) {
+        // read the end of broker's file
+        return Status::OK();
+    }
+    if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
+        std::stringstream ss;
+        ss << "Open broker reader failed, broker:" << _broker_addr
+           << " failed:" << response.opStatus.message;
+        return Status::InternalError(ss.str());
+    }
+    *data = std::move(response.data);
+    return Status::OK();
+}
+
+Status BrokerFileSystem::close_file(const TBrokerFD& fd) const {
+    CHECK_BROKER_CLIENT(_connection);
+    TBrokerCloseReaderRequest request;
+    request.__set_version(TBrokerVersion::VERSION_ONE);
+    request.__set_fd(fd);
+
+    TBrokerOperationStatus response;
+    try {
+        try {
+            (*_connection)->closeReader(response, request);
+        } catch (apache::thrift::transport::TTransportException&) {
+            std::this_thread::sleep_for(std::chrono::seconds(1));
+            RETURN_IF_ERROR((*_connection).reopen());
+            (*_connection)->closeReader(response, request);
+        }
+    } catch (apache::thrift::TException& e) {
+        std::stringstream ss;
+        ss << "close broker file failed, broker:" << _broker_addr << " 
failed:" << e.what();
+        return Status::RpcError(ss.str());
+    }
+
+    if (response.statusCode != TBrokerOperationStatusCode::OK) {
+        std::stringstream ss;
+        ss << "close broker file failed, broker:" << _broker_addr << " 
failed:" << response.message;
+        return Status::InternalError(ss.str());
+    }
     return Status::OK();
 }
 
@@ -480,5 +546,4 @@ std::string BrokerFileSystem::error_msg(const std::string& 
err) const {
     return fmt::format("({}:{}), {}", _broker_addr.hostname, 
_broker_addr.port, err);
 }
 
-} // namespace io
-} // namespace doris
+} // namespace doris::io
diff --git a/be/src/io/fs/broker_file_system.h 
b/be/src/io/fs/broker_file_system.h
index 1e29b10a744..49a2ced67d9 100644
--- a/be/src/io/fs/broker_file_system.h
+++ b/be/src/io/fs/broker_file_system.h
@@ -19,6 +19,7 @@
 
 #include <stdint.h>
 
+#include <cstddef>
 #include <map>
 #include <memory>
 #include <string>
@@ -44,7 +45,9 @@ public:
 
     ~BrokerFileSystem() override = default;
 
-    Status get_client(std::shared_ptr<BrokerServiceConnection>* client) const;
+    Status read_file(const TBrokerFD& fd, size_t offset, size_t bytes_req, 
std::string* data) const;
+
+    Status close_file(const TBrokerFD& fd) const;
 
 protected:
     Status connect_impl() override;
@@ -80,7 +83,7 @@ private:
     const TNetworkAddress& _broker_addr;
     const std::map<std::string, std::string>& _broker_prop;
 
-    std::shared_ptr<BrokerServiceConnection> _client;
+    std::unique_ptr<BrokerServiceConnection> _connection;
 };
 } // namespace io
 } // namespace doris
diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h
index 7ba5cafabea..ff45055ca65 100644
--- a/be/src/runtime/client_cache.h
+++ b/be/src/runtime/client_cache.h
@@ -187,6 +187,8 @@ public:
 
     Status reopen() { return _client_cache->reopen_client(&_client, 0); }
 
+    inline bool is_alive() { return _client != nullptr; }
+
     T* operator->() const { return _client; }
 
 private:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to