This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 57729bad68 [Enhancement](multi-catalog) Add hdfs read statistics 
profile. (#21442)
57729bad68 is described below

commit 57729bad6841ea9728e6b2cf0bd484133e7b9ead
Author: Qi Chen <kaka11.c...@gmail.com>
AuthorDate: Fri Jul 7 14:52:14 2023 +0800

    [Enhancement](multi-catalog) Add hdfs read statistics profile. (#21442)
    
    Add hdfs read statistics profile.
    ```
      -  HdfsIO:  0ns
        -  TotalBytesRead:  133.47  MB
        -  TotalLocalBytesRead:  133.47  MB
        -  TotalShortCircuitBytesRead:  133.47  MB
        -  TotalZeroCopyBytesRead:  0.00
    ```
---
 be/src/io/file_factory.cpp                 |  9 ++++---
 be/src/io/file_factory.h                   |  3 ++-
 be/src/io/fs/benchmark/hdfs_benchmark.hpp  | 10 ++++----
 be/src/io/fs/hdfs_file_reader.cpp          | 40 ++++++++++++++++++++++++++++--
 be/src/io/fs/hdfs_file_reader.h            | 16 +++++++++++-
 be/src/io/fs/hdfs_file_system.cpp          | 12 +++++----
 be/src/io/fs/hdfs_file_system.h            |  7 ++++--
 be/src/runtime/snapshot_loader.cpp         |  2 +-
 be/src/vec/runtime/vfile_result_writer.cpp |  2 +-
 9 files changed, 79 insertions(+), 22 deletions(-)

diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index af32c81982..686c9a1b33 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -97,7 +97,7 @@ Status FileFactory::create_file_writer(TFileType::type type, 
ExecEnv* env,
     case TFileType::FILE_HDFS: {
         THdfsParams hdfs_params = parse_properties(properties);
         std::shared_ptr<io::HdfsFileSystem> fs;
-        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, 
&fs));
         RETURN_IF_ERROR(fs->create_file(path, &file_writer));
         break;
     }
@@ -128,7 +128,7 @@ Status FileFactory::create_file_reader(const 
FileSystemProperties& system_proper
     }
     case TFileType::FILE_HDFS: {
         RETURN_IF_ERROR(create_hdfs_reader(system_properties.hdfs_params, 
file_description.path,
-                                           file_system, file_reader, 
reader_options));
+                                           file_system, file_reader, 
reader_options, profile));
         break;
     }
     case TFileType::FILE_BROKER: {
@@ -168,9 +168,10 @@ Status FileFactory::create_pipe_reader(const TUniqueId& 
load_id, io::FileReaderS
 Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const 
std::string& path,
                                        std::shared_ptr<io::FileSystem>* 
hdfs_file_system,
                                        io::FileReaderSPtr* reader,
-                                       const io::FileReaderOptions& 
reader_options) {
+                                       const io::FileReaderOptions& 
reader_options,
+                                       RuntimeProfile* profile) {
     std::shared_ptr<io::HdfsFileSystem> fs;
-    RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+    RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", profile, &fs));
     RETURN_IF_ERROR(fs->open_file(path, reader_options, reader));
     *hdfs_file_system = std::move(fs);
     return Status::OK();
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index b5cbcdfc7c..0f9dee60d0 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -81,7 +81,8 @@ public:
     static Status create_hdfs_reader(const THdfsParams& hdfs_params, const 
std::string& path,
                                      std::shared_ptr<io::FileSystem>* 
hdfs_file_system,
                                      io::FileReaderSPtr* reader,
-                                     const io::FileReaderOptions& 
reader_options);
+                                     const io::FileReaderOptions& 
reader_options,
+                                     RuntimeProfile* profile);
 
     static Status create_s3_reader(const std::map<std::string, std::string>& 
prop,
                                    const std::string& path,
diff --git a/be/src/io/fs/benchmark/hdfs_benchmark.hpp 
b/be/src/io/fs/benchmark/hdfs_benchmark.hpp
index b508e14a24..d6ad059b77 100644
--- a/be/src/io/fs/benchmark/hdfs_benchmark.hpp
+++ b/be/src/io/fs/benchmark/hdfs_benchmark.hpp
@@ -50,8 +50,8 @@ public:
         io::FileReaderSPtr reader;
         io::FileReaderOptions reader_opts = 
FileFactory::get_reader_options(nullptr);
         THdfsParams hdfs_params = parse_properties(_conf_map);
-        RETURN_IF_ERROR(
-                FileFactory::create_hdfs_reader(hdfs_params, file_path, &fs, 
&reader, reader_opts));
+        RETURN_IF_ERROR(FileFactory::create_hdfs_reader(hdfs_params, 
file_path, &fs, &reader,
+                                                        reader_opts, nullptr));
         auto end = std::chrono::high_resolution_clock::now();
         auto elapsed_seconds =
                 std::chrono::duration_cast<std::chrono::duration<double>>(end 
- start);
@@ -94,7 +94,7 @@ public:
         std::shared_ptr<io::HdfsFileSystem> fs;
         io::FileWriterPtr writer;
         THdfsParams hdfs_params = parse_properties(_conf_map);
-        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, 
&fs));
         RETURN_IF_ERROR(fs->create_file(file_path, &writer));
         return write(state, writer.get());
     }
@@ -115,7 +115,7 @@ public:
         auto new_file_path = file_path + "_new";
         THdfsParams hdfs_params = parse_properties(_conf_map);
         std::shared_ptr<io::HdfsFileSystem> fs;
-        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, 
&fs));
 
         auto start = std::chrono::high_resolution_clock::now();
         RETURN_IF_ERROR(fs->rename(file_path, new_file_path));
@@ -142,7 +142,7 @@ public:
 
         std::shared_ptr<io::HdfsFileSystem> fs;
         THdfsParams hdfs_params = parse_properties(_conf_map);
-        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, 
&fs));
 
         auto start = std::chrono::high_resolution_clock::now();
         bool res = false;
diff --git a/be/src/io/fs/hdfs_file_reader.cpp 
b/be/src/io/fs/hdfs_file_reader.cpp
index 402cdb3faf..cf3a2b6563 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -36,12 +36,29 @@ namespace doris {
 namespace io {
 
 HdfsFileReader::HdfsFileReader(Path path, const std::string& name_node,
-                               FileHandleCache::Accessor accessor)
-        : _path(std::move(path)), _name_node(name_node), 
_accessor(std::move(accessor)) {
+                               FileHandleCache::Accessor accessor, 
RuntimeProfile* profile)
+        : _path(std::move(path)),
+          _name_node(name_node),
+          _accessor(std::move(accessor)),
+          _profile(profile) {
     _handle = _accessor.get();
 
     DorisMetrics::instance()->hdfs_file_open_reading->increment(1);
     DorisMetrics::instance()->hdfs_file_reader_total->increment(1);
+    if (_profile != nullptr) {
+#ifdef USE_HADOOP_HDFS
+        const char* hdfs_profile_name = "HdfsIO";
+        ADD_TIMER(_profile, hdfs_profile_name);
+        _hdfs_profile.total_bytes_read =
+                ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, 
hdfs_profile_name);
+        _hdfs_profile.total_local_bytes_read =
+                ADD_CHILD_COUNTER(_profile, "TotalLocalBytesRead", 
TUnit::BYTES, hdfs_profile_name);
+        _hdfs_profile.total_short_circuit_bytes_read = ADD_CHILD_COUNTER(
+                _profile, "TotalShortCircuitBytesRead", TUnit::BYTES, 
hdfs_profile_name);
+        _hdfs_profile.total_total_zero_copy_bytes_read = ADD_CHILD_COUNTER(
+                _profile, "TotalZeroCopyBytesRead", TUnit::BYTES, 
hdfs_profile_name);
+#endif
+    }
 }
 
 HdfsFileReader::~HdfsFileReader() {
@@ -52,6 +69,25 @@ Status HdfsFileReader::close() {
     bool expected = false;
     if (_closed.compare_exchange_strong(expected, true, 
std::memory_order_acq_rel)) {
         DorisMetrics::instance()->hdfs_file_open_reading->increment(-1);
+        if (_profile != nullptr) {
+#ifdef USE_HADOOP_HDFS
+            struct hdfsReadStatistics* hdfs_statistics = nullptr;
+            auto r = hdfsFileGetReadStatistics(_handle->file(), 
&hdfs_statistics);
+            if (r != 0) {
+                return Status::InternalError(
+                        fmt::format("Failed to run 
hdfsFileGetReadStatistics(): {}", r));
+            }
+            COUNTER_UPDATE(_hdfs_profile.total_bytes_read, 
hdfs_statistics->totalBytesRead);
+            COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read,
+                           hdfs_statistics->totalLocalBytesRead);
+            COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read,
+                           hdfs_statistics->totalShortCircuitBytesRead);
+            COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read,
+                           hdfs_statistics->totalZeroCopyBytesRead);
+            hdfsFileFreeReadStatistics(hdfs_statistics);
+            hdfsFileClearReadStatistics(_handle->file());
+#endif
+        }
     }
     return Status::OK();
 }
diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h
index efff1bfcd6..864a55bc41 100644
--- a/be/src/io/fs/hdfs_file_reader.h
+++ b/be/src/io/fs/hdfs_file_reader.h
@@ -38,7 +38,8 @@ class IOContext;
 
 class HdfsFileReader : public FileReader {
 public:
-    HdfsFileReader(Path path, const std::string& name_node, 
FileHandleCache::Accessor accessor);
+    HdfsFileReader(Path path, const std::string& name_node, 
FileHandleCache::Accessor accessor,
+                   RuntimeProfile* profile);
 
     ~HdfsFileReader() override;
 
@@ -57,11 +58,24 @@ protected:
                         const IOContext* io_ctx) override;
 
 private:
+#ifdef USE_HADOOP_HDFS
+    struct HDFSProfile {
+        RuntimeProfile::Counter* total_bytes_read;
+        RuntimeProfile::Counter* total_local_bytes_read;
+        RuntimeProfile::Counter* total_short_circuit_bytes_read;
+        RuntimeProfile::Counter* total_total_zero_copy_bytes_read;
+    };
+#endif
+
     Path _path;
     const std::string& _name_node;
     FileHandleCache::Accessor _accessor;
     CachedHdfsFileHandle* _handle = nullptr; // owned by _cached_file_handle
     std::atomic<bool> _closed = false;
+    RuntimeProfile* _profile;
+#ifdef USE_HADOOP_HDFS
+    HDFSProfile _hdfs_profile;
+#endif
 };
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/fs/hdfs_file_system.cpp 
b/be/src/io/fs/hdfs_file_system.cpp
index 775754bd4d..5e90f04dba 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -123,7 +123,7 @@ Status HdfsFileHandleCache::get_file(const 
std::shared_ptr<HdfsFileSystem>& fs,
 }
 
 Status HdfsFileSystem::create(const THdfsParams& hdfs_params, const 
std::string& path,
-                              std::shared_ptr<HdfsFileSystem>* fs) {
+                              RuntimeProfile* profile, 
std::shared_ptr<HdfsFileSystem>* fs) {
 #ifdef USE_HADOOP_HDFS
     if (!config::enable_java_support) {
         return Status::InternalError(
@@ -131,14 +131,16 @@ Status HdfsFileSystem::create(const THdfsParams& 
hdfs_params, const std::string&
                 "true.");
     }
 #endif
-    (*fs).reset(new HdfsFileSystem(hdfs_params, path));
+    (*fs).reset(new HdfsFileSystem(hdfs_params, path, profile));
     return (*fs)->connect();
 }
 
-HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const 
std::string& path)
+HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const 
std::string& path,
+                               RuntimeProfile* profile)
         : RemoteFileSystem(path, "", FileSystemType::HDFS),
           _hdfs_params(hdfs_params),
-          _fs_handle(nullptr) {
+          _fs_handle(nullptr),
+          _profile(profile) {
     _namenode = _hdfs_params.fs_name;
 }
 
@@ -175,7 +177,7 @@ Status HdfsFileSystem::open_file_internal(const Path& file, 
int64_t file_size,
             std::static_pointer_cast<HdfsFileSystem>(shared_from_this()), 
real_path, 0, file_size,
             &accessor));
 
-    *reader = std::make_shared<HdfsFileReader>(file, _namenode, 
std::move(accessor));
+    *reader = std::make_shared<HdfsFileReader>(file, _namenode, 
std::move(accessor), _profile);
     return Status::OK();
 }
 
diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h
index bd28ec73c2..6a45a92b37 100644
--- a/be/src/io/fs/hdfs_file_system.h
+++ b/be/src/io/fs/hdfs_file_system.h
@@ -33,6 +33,7 @@
 #include "io/fs/hdfs.h"
 #include "io/fs/path.h"
 #include "io/fs/remote_file_system.h"
+#include "util/runtime_profile.h"
 
 namespace doris {
 class THdfsParams;
@@ -111,7 +112,7 @@ class HdfsFileHandleCache;
 class HdfsFileSystem final : public RemoteFileSystem {
 public:
     static Status create(const THdfsParams& hdfs_params, const std::string& 
path,
-                         std::shared_ptr<HdfsFileSystem>* fs);
+                         RuntimeProfile* profile, 
std::shared_ptr<HdfsFileSystem>* fs);
 
     ~HdfsFileSystem() override;
 
@@ -148,12 +149,14 @@ private:
 
 private:
     friend class HdfsFileWriter;
-    HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path);
+    HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path,
+                   RuntimeProfile* profile);
     const THdfsParams& _hdfs_params;
     std::string _namenode;
     // do not use std::shared_ptr or std::unique_ptr
     // _fs_handle is managed by HdfsFileSystemCache
     HdfsFileSystemHandle* _fs_handle;
+    RuntimeProfile* _profile;
 };
 } // namespace io
 } // namespace doris
diff --git a/be/src/runtime/snapshot_loader.cpp 
b/be/src/runtime/snapshot_loader.cpp
index f1b58fa454..3ff8229bc3 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -86,7 +86,7 @@ Status SnapshotLoader::init(TStorageBackendType::type type, 
const std::string& l
     } else if (TStorageBackendType::type::HDFS == type) {
         THdfsParams hdfs_params = parse_properties(_prop);
         std::shared_ptr<io::HdfsFileSystem> fs;
-        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, 
&fs));
         _remote_fs = std::move(fs);
     } else if (TStorageBackendType::type::BROKER == type) {
         std::shared_ptr<io::BrokerFileSystem> fs;
diff --git a/be/src/vec/runtime/vfile_result_writer.cpp 
b/be/src/vec/runtime/vfile_result_writer.cpp
index ed408e5f7a..8977cd0c47 100644
--- a/be/src/vec/runtime/vfile_result_writer.cpp
+++ b/be/src/vec/runtime/vfile_result_writer.cpp
@@ -595,7 +595,7 @@ Status VFileResultWriter::_delete_dir() {
     case TStorageBackendType::HDFS: {
         THdfsParams hdfs_params = 
parse_properties(_file_opts->broker_properties);
         std::shared_ptr<io::HdfsFileSystem> hdfs_fs = nullptr;
-        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &hdfs_fs));
+        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, 
&hdfs_fs));
         file_system = hdfs_fs;
         break;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to