This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 5d3f0a267a8 [opt](scan) unify the local and remote scan bytes stats 
for all scanners for 2.1 (#45167)
5d3f0a267a8 is described below

commit 5d3f0a267a89a5954312e17831832a20e6d0dd4d
Author: Mingyu Chen (Rayner) <morning...@163.com>
AuthorDate: Mon Dec 9 22:19:19 2024 -0800

    [opt](scan) unify the local and remote scan bytes stats for all scanners 
for 2.1 (#45167)
    
    pick part of #40493
    
    TODO: not working with s3 reader
---
 .../schema_scanner/schema_backend_active_tasks.cpp |  4 ++-
 be/src/io/cache/block/block_file_segment.cpp       |  4 +--
 be/src/io/cache/block/block_file_segment.h         |  2 +-
 .../io/cache/block/cached_remote_file_reader.cpp   |  9 ++---
 be/src/io/cache/block/cached_remote_file_reader.h  |  5 +--
 be/src/io/fs/broker_file_reader.cpp                |  5 ++-
 be/src/io/fs/broker_file_reader.h                  |  2 --
 be/src/io/fs/file_reader.h                         |  2 +-
 be/src/io/fs/hdfs_file_reader.cpp                  | 10 ++++--
 be/src/io/fs/hdfs_file_reader.h                    |  1 -
 be/src/io/fs/local_file_reader.cpp                 |  5 ++-
 be/src/io/fs/s3_file_reader.cpp                    |  8 +++--
 be/src/io/fs/s3_file_reader.h                      |  1 -
 be/src/io/io_common.h                              | 42 ++++++++++++++++++++++
 be/src/runtime/query_statistics.cpp                |  8 +++++
 be/src/runtime/query_statistics.h                  | 13 +++++++
 be/src/runtime/runtime_query_statistics_mgr.cpp    | 37 ++++++++++++-------
 be/src/vec/exec/format/orc/vorc_reader.h           |  1 -
 be/src/vec/exec/scan/new_olap_scanner.cpp          | 16 ++++++++-
 be/src/vec/exec/scan/new_olap_scanner.h            |  1 +
 be/src/vec/exec/scan/vfile_scanner.cpp             | 15 ++++++++
 be/src/vec/exec/scan/vfile_scanner.h               |  2 ++
 be/src/vec/exec/scan/vscanner.cpp                  | 17 ++++-----
 be/src/vec/exec/scan/vscanner.h                    | 11 +++++-
 be/test/io/cache/file_block_cache_test.cpp         | 14 ++++----
 .../org/apache/doris/catalog/InternalSchema.java   |  6 ++++
 .../java/org/apache/doris/catalog/SchemaTable.java |  2 ++
 .../java/org/apache/doris/plugin/AuditEvent.java   | 14 ++++++++
 .../org/apache/doris/plugin/audit/AuditLoader.java |  4 +++
 .../java/org/apache/doris/qe/AuditLogHelper.java   |  6 +++-
 .../WorkloadRuntimeStatusMgr.java                  |  5 +++
 gensrc/proto/data.proto                            |  2 ++
 gensrc/thrift/FrontendService.thrift               |  2 ++
 33 files changed, 222 insertions(+), 54 deletions(-)

diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp 
b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
index 74e95f42032..a67b2600d23 100644
--- a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
+++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
@@ -34,6 +34,8 @@ std::vector<SchemaScanner::ColumnDesc> 
SchemaBackendActiveTasksScanner::_s_tbls_
         {"TASK_CPU_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
         {"SCAN_ROWS", TYPE_BIGINT, sizeof(int64_t), false},
         {"SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
+        {"LOCAL_SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
+        {"REMOTE_SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
         {"BE_PEAK_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
         {"CURRENT_USED_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
         {"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
@@ -93,4 +95,4 @@ Status 
SchemaBackendActiveTasksScanner::get_next_block_internal(vectorized::Bloc
     return Status::OK();
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/io/cache/block/block_file_segment.cpp 
b/be/src/io/cache/block/block_file_segment.cpp
index 564ac9d776f..1d6d425593d 100644
--- a/be/src/io/cache/block/block_file_segment.cpp
+++ b/be/src/io/cache/block/block_file_segment.cpp
@@ -179,7 +179,7 @@ std::string FileBlock::get_path_in_local_cache() const {
     return _cache->get_path_in_local_cache(key(), offset(), _cache_type);
 }
 
-Status FileBlock::read_at(Slice buffer, size_t read_offset) {
+Status FileBlock::read_at(Slice buffer, size_t read_offset, const IOContext* 
io_ctx) {
     Status st = Status::OK();
     std::shared_ptr<FileReader> reader;
     if (!(reader = _cache_reader.lock())) {
@@ -192,7 +192,7 @@ Status FileBlock::read_at(Slice buffer, size_t read_offset) 
{
         }
     }
     size_t bytes_reads = buffer.size;
-    RETURN_IF_ERROR(reader->read_at(read_offset, buffer, &bytes_reads));
+    RETURN_IF_ERROR(reader->read_at(read_offset, buffer, &bytes_reads, 
io_ctx));
     DCHECK(bytes_reads == buffer.size);
     return st;
 }
diff --git a/be/src/io/cache/block/block_file_segment.h 
b/be/src/io/cache/block/block_file_segment.h
index 67f4fc17a0d..d1e341d42c6 100644
--- a/be/src/io/cache/block/block_file_segment.h
+++ b/be/src/io/cache/block/block_file_segment.h
@@ -110,7 +110,7 @@ public:
     Status append(Slice data);
 
     // read data from cache file
-    Status read_at(Slice buffer, size_t read_offset);
+    Status read_at(Slice buffer, size_t read_offset, const IOContext* io_ctx);
 
     // finish write, release the file writer
     Status finalize_write();
diff --git a/be/src/io/cache/block/cached_remote_file_reader.cpp 
b/be/src/io/cache/block/cached_remote_file_reader.cpp
index bbd7516dfaa..f8fda4b028e 100644
--- a/be/src/io/cache/block/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/block/cached_remote_file_reader.cpp
@@ -112,7 +112,6 @@ Status CachedRemoteFileReader::_read_from_cache(size_t 
offset, Slice result, siz
         RETURN_IF_ERROR(_remote_file_reader->read_at(offset, result, 
bytes_read, io_ctx));
         DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read);
         if (io_ctx->file_cache_stats) {
-            stats.bytes_read += bytes_req;
             _update_state(stats, io_ctx->file_cache_stats);
         }
         return Status::OK();
@@ -142,7 +141,6 @@ Status CachedRemoteFileReader::_read_from_cache(size_t 
offset, Slice result, siz
             break;
         }
     }
-    stats.bytes_read += bytes_req;
     size_t empty_start = 0;
     size_t empty_end = 0;
     if (!empty_segments.empty()) {
@@ -224,8 +222,9 @@ Status CachedRemoteFileReader::_read_from_cache(size_t 
offset, Slice result, siz
         size_t file_offset = current_offset - left;
         {
             SCOPED_RAW_TIMER(&stats.local_read_timer);
-            RETURN_IF_ERROR(segment->read_at(
-                    Slice(result.data + (current_offset - offset), read_size), 
file_offset));
+            RETURN_IF_ERROR(
+                    segment->read_at(Slice(result.data + (current_offset - 
offset), read_size),
+                                     file_offset, io_ctx));
         }
         *bytes_read += read_size;
         current_offset = right + 1;
@@ -280,10 +279,8 @@ void CachedRemoteFileReader::_update_state(const 
ReadStatistics& read_stats,
     }
     if (read_stats.hit_cache) {
         statis->num_local_io_total++;
-        statis->bytes_read_from_local += read_stats.bytes_read;
     } else {
         statis->num_remote_io_total++;
-        statis->bytes_read_from_remote += read_stats.bytes_read;
     }
     statis->remote_io_timer += read_stats.remote_read_timer;
     statis->local_io_timer += read_stats.local_read_timer;
diff --git a/be/src/io/cache/block/cached_remote_file_reader.h 
b/be/src/io/cache/block/cached_remote_file_reader.h
index e4e280d95f6..af33e5d6f15 100644
--- a/be/src/io/cache/block/cached_remote_file_reader.h
+++ b/be/src/io/cache/block/cached_remote_file_reader.h
@@ -66,10 +66,11 @@ private:
     IFileCache::Key _cache_key;
     CloudFileCachePtr _cache;
 
+    // Used to record read/write timer and cache related metrics.
+    // These metrics will finally be saved in FileCacheStatistics.
     struct ReadStatistics {
         bool hit_cache = true;
         bool skip_cache = false;
-        int64_t bytes_read = 0;
         int64_t bytes_write_into_file_cache = 0;
         int64_t remote_read_timer = 0;
         int64_t local_read_timer = 0;
@@ -82,4 +83,4 @@ private:
 };
 
 } // namespace io
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/io/fs/broker_file_reader.cpp 
b/be/src/io/fs/broker_file_reader.cpp
index 4d370cfb4d8..bb0eac47e38 100644
--- a/be/src/io/fs/broker_file_reader.cpp
+++ b/be/src/io/fs/broker_file_reader.cpp
@@ -62,7 +62,7 @@ Status BrokerFileReader::close() {
 }
 
 Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t* 
bytes_read,
-                                      const IOContext* /*io_ctx*/) {
+                                      const IOContext* io_ctx) {
     DCHECK(!closed());
     size_t bytes_req = result.size;
     char* to = result.data;
@@ -76,6 +76,9 @@ Status BrokerFileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes
 
     *bytes_read = data.size();
     memcpy(to, data.data(), *bytes_read);
+    if (io_ctx && io_ctx->file_cache_stats) {
+        io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req;
+    }
     return Status::OK();
 }
 
diff --git a/be/src/io/fs/broker_file_reader.h 
b/be/src/io/fs/broker_file_reader.h
index 7acdcbcc0d5..43a46c62331 100644
--- a/be/src/io/fs/broker_file_reader.h
+++ b/be/src/io/fs/broker_file_reader.h
@@ -34,8 +34,6 @@
 
 namespace doris::io {
 
-struct IOContext;
-
 class BrokerFileReader : public FileReader {
 public:
     BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t 
file_size, TBrokerFD fd,
diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h
index 03828ef28dd..b41df4426ce 100644
--- a/be/src/io/fs/file_reader.h
+++ b/be/src/io/fs/file_reader.h
@@ -24,6 +24,7 @@
 
 #include "common/status.h"
 #include "io/fs/path.h"
+#include "io/io_common.h"
 #include "util/profile_collector.h"
 #include "util/slice.h"
 
@@ -32,7 +33,6 @@ namespace doris {
 namespace io {
 
 class FileSystem;
-struct IOContext;
 
 enum class FileCachePolicy : uint8_t {
     NO_CACHE,
diff --git a/be/src/io/fs/hdfs_file_reader.cpp 
b/be/src/io/fs/hdfs_file_reader.cpp
index 263276768bc..ac75e2e722b 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -84,7 +84,7 @@ Status HdfsFileReader::close() {
 
 #ifdef USE_HADOOP_HDFS
 Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* 
bytes_read,
-                                    const IOContext* /*io_ctx*/) {
+                                    const IOContext* io_ctx) {
     DCHECK(!closed());
     if (offset > _handle->file_size()) {
         return Status::IOError("offset exceeds file size(offset: {}, file 
size: {}, path: {})",
@@ -121,6 +121,9 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_r
         has_read += loop_read;
     }
     *bytes_read = has_read;
+    if (io_ctx && io_ctx->file_cache_stats) {
+        io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req;
+    }
     return Status::OK();
 }
 
@@ -128,7 +131,7 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_r
 // The hedged read only support hdfsPread().
 // TODO: rethink here to see if there are some difference between hdfsPread() 
and hdfsRead()
 Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* 
bytes_read,
-                                    const IOContext* /*io_ctx*/) {
+                                    const IOContext* io_ctx) {
     DCHECK(!closed());
     if (offset > _handle->file_size()) {
         return Status::IOError("offset exceeds file size(offset: {}, file 
size: {}, path: {})",
@@ -177,6 +180,9 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_r
         has_read += loop_read;
     }
     *bytes_read = has_read;
+    if (io_ctx && io_ctx->file_cache_stats) {
+        io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req;
+    }
     return Status::OK();
 }
 #endif
diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h
index 6204859e600..0f4a3f14019 100644
--- a/be/src/io/fs/hdfs_file_reader.h
+++ b/be/src/io/fs/hdfs_file_reader.h
@@ -34,7 +34,6 @@
 
 namespace doris {
 namespace io {
-struct IOContext;
 
 class HdfsFileReader : public FileReader {
 public:
diff --git a/be/src/io/fs/local_file_reader.cpp 
b/be/src/io/fs/local_file_reader.cpp
index 93953eeddd9..c7abf2ad047 100644
--- a/be/src/io/fs/local_file_reader.cpp
+++ b/be/src/io/fs/local_file_reader.cpp
@@ -118,7 +118,7 @@ Status LocalFileReader::close() {
 }
 
 Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* 
bytes_read,
-                                     const IOContext* /*io_ctx*/) {
+                                     const IOContext* io_ctx) {
     DCHECK(!closed());
     if (offset > _file_size) {
         return Status::InternalError(
@@ -148,6 +148,9 @@ Status LocalFileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_
             *bytes_read += res;
         }
     }
+    if (io_ctx && io_ctx->file_cache_stats) {
+        io_ctx->file_cache_stats->bytes_read_from_local += *bytes_read;
+    }
     DorisMetrics::instance()->local_bytes_read_total->increment(*bytes_read);
     return Status::OK();
 }
diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp
index 005257c1312..2d97252319d 100644
--- a/be/src/io/fs/s3_file_reader.cpp
+++ b/be/src/io/fs/s3_file_reader.cpp
@@ -42,7 +42,6 @@
 
 namespace doris {
 namespace io {
-struct IOContext;
 bvar::Adder<uint64_t> s3_file_reader_read_counter("s3_file_reader", "read_at");
 bvar::Adder<uint64_t> s3_file_reader_total("s3_file_reader", "total_num");
 bvar::Adder<uint64_t> s3_bytes_read_total("s3_file_reader", "bytes_read");
@@ -86,7 +85,7 @@ Status S3FileReader::close() {
 }
 
 Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* 
bytes_read,
-                                  const IOContext* /*io_ctx*/) {
+                                  const IOContext* io_ctx) {
     DCHECK(!closed());
     if (offset > _file_size) {
         return Status::InternalError(
@@ -154,6 +153,11 @@ Status S3FileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_rea
             LOG(INFO) << fmt::format("read s3 file {} succeed after {} times 
with {} ms sleeping",
                                      _path.native(), retry_count, 
total_sleep_time);
         }
+        // ATTN: Do not open it, may casuing stack-use-after-scope.
+        // Will be refactored in future
+        // if (io_ctx && io_ctx->file_cache_stats) {
+        //     io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req;
+        // }
         return Status::OK();
     }
     return Status::InternalError("failed to read from s3, exceeded maximum 
retries");
diff --git a/be/src/io/fs/s3_file_reader.h b/be/src/io/fs/s3_file_reader.h
index c9e9656fe58..50a19712674 100644
--- a/be/src/io/fs/s3_file_reader.h
+++ b/be/src/io/fs/s3_file_reader.h
@@ -35,7 +35,6 @@ namespace doris {
 class RuntimeProfile;
 
 namespace io {
-struct IOContext;
 
 class S3FileReader final : public FileReader {
 public:
diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h
index 80a594473dc..3bd92dc7c1a 100644
--- a/be/src/io/io_common.h
+++ b/be/src/io/io_common.h
@@ -19,6 +19,8 @@
 
 #include <gen_cpp/Types_types.h>
 
+#include <sstream>
+
 namespace doris {
 
 enum class ReaderType : uint8_t {
@@ -45,6 +47,38 @@ struct FileCacheStatistics {
     int64_t write_cache_io_timer = 0;
     int64_t bytes_write_into_cache = 0;
     int64_t num_skip_cache_io_total = 0;
+
+    void update(const FileCacheStatistics& other) {
+        num_local_io_total += other.num_local_io_total;
+        num_remote_io_total += other.num_remote_io_total;
+        local_io_timer += other.local_io_timer;
+        bytes_read_from_local += other.bytes_read_from_local;
+        bytes_read_from_remote += other.bytes_read_from_remote;
+        remote_io_timer += other.remote_io_timer;
+        write_cache_io_timer += other.write_cache_io_timer;
+        write_cache_io_timer += other.write_cache_io_timer;
+        bytes_write_into_cache += other.bytes_write_into_cache;
+        num_skip_cache_io_total += other.num_skip_cache_io_total;
+    }
+
+    void reset() {
+        num_local_io_total = 0;
+        num_remote_io_total = 0;
+        local_io_timer = 0;
+        bytes_read_from_local = 0;
+        bytes_read_from_remote = 0;
+        remote_io_timer = 0;
+        write_cache_io_timer = 0;
+        bytes_write_into_cache = 0;
+        num_skip_cache_io_total = 0;
+    }
+
+    std::string debug_string() const {
+        std::stringstream ss;
+        ss << "bytes_read_from_local: " << bytes_read_from_local
+           << ", bytes_read_from_remote: " << bytes_read_from_remote;
+        return ss.str();
+    }
 };
 
 struct IOContext {
@@ -60,6 +94,14 @@ struct IOContext {
     int64_t expiration_time = 0;
     const TUniqueId* query_id = nullptr;             // Ref
     FileCacheStatistics* file_cache_stats = nullptr; // Ref
+
+    std::string debug_string() const {
+        if (file_cache_stats != nullptr) {
+            return file_cache_stats->debug_string();
+        } else {
+            return "no file cache stats";
+        }
+    }
 };
 
 } // namespace io
diff --git a/be/src/runtime/query_statistics.cpp 
b/be/src/runtime/query_statistics.cpp
index de950704180..4f87da1196b 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -32,6 +32,8 @@ void QueryStatistics::merge(const QueryStatistics& other) {
     cpu_nanos += other.cpu_nanos.load(std::memory_order_relaxed);
     shuffle_send_bytes += 
other.shuffle_send_bytes.load(std::memory_order_relaxed);
     shuffle_send_rows += 
other.shuffle_send_rows.load(std::memory_order_relaxed);
+    _scan_bytes_from_local_storage += other._scan_bytes_from_local_storage;
+    _scan_bytes_from_remote_storage += other._scan_bytes_from_remote_storage;
 
     int64_t other_peak_mem = 
other.max_peak_memory_bytes.load(std::memory_order_relaxed);
     if (other_peak_mem > this->max_peak_memory_bytes) {
@@ -51,6 +53,8 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
     statistics->set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS);
     statistics->set_returned_rows(returned_rows);
     statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
+    
statistics->set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
+    
statistics->set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
 }
 
 void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
@@ -64,12 +68,16 @@ void QueryStatistics::to_thrift(TQueryStatistics* 
statistics) const {
             current_used_memory_bytes.load(std::memory_order_relaxed));
     
statistics->__set_shuffle_send_bytes(shuffle_send_bytes.load(std::memory_order_relaxed));
     
statistics->__set_shuffle_send_rows(shuffle_send_rows.load(std::memory_order_relaxed));
+    
statistics->__set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
+    
statistics->__set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
 }
 
 void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
     scan_rows = statistics.scan_rows();
     scan_bytes = statistics.scan_bytes();
     cpu_nanos = statistics.cpu_ms() * NANOS_PER_MILLIS;
+    _scan_bytes_from_local_storage = 
statistics.scan_bytes_from_local_storage();
+    _scan_bytes_from_remote_storage = 
statistics.scan_bytes_from_remote_storage();
 }
 
 void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
diff --git a/be/src/runtime/query_statistics.h 
b/be/src/runtime/query_statistics.h
index a9f6e192ec0..fcfbf48bb18 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -44,6 +44,8 @@ public:
             : scan_rows(0),
               scan_bytes(0),
               cpu_nanos(0),
+              _scan_bytes_from_local_storage(0),
+              _scan_bytes_from_remote_storage(0),
               returned_rows(0),
               max_peak_memory_bytes(0),
               current_used_memory_bytes(0),
@@ -65,6 +67,13 @@ public:
         this->cpu_nanos.fetch_add(delta_cpu_time, std::memory_order_relaxed);
     }
 
+    void add_scan_bytes_from_local_storage(int64_t 
scan_bytes_from_local_storage) {
+        _scan_bytes_from_local_storage += scan_bytes_from_local_storage;
+    }
+    void add_scan_bytes_from_remote_storage(int64_t 
scan_bytes_from_remote_storage) {
+        _scan_bytes_from_remote_storage += scan_bytes_from_remote_storage;
+    }
+
     void add_shuffle_send_bytes(int64_t delta_bytes) {
         this->shuffle_send_bytes.fetch_add(delta_bytes, 
std::memory_order_relaxed);
     }
@@ -95,6 +104,8 @@ public:
         cpu_nanos.store(0, std::memory_order_relaxed);
         shuffle_send_bytes.store(0, std::memory_order_relaxed);
         shuffle_send_rows.store(0, std::memory_order_relaxed);
+        _scan_bytes_from_local_storage.store(0, std::memory_order_relaxed);
+        _scan_bytes_from_remote_storage.store(0, std::memory_order_relaxed);
 
         returned_rows = 0;
         max_peak_memory_bytes.store(0, std::memory_order_relaxed);
@@ -120,6 +131,8 @@ private:
     std::atomic<int64_t> scan_rows;
     std::atomic<int64_t> scan_bytes;
     std::atomic<int64_t> cpu_nanos;
+    std::atomic<int64_t> _scan_bytes_from_local_storage;
+    std::atomic<int64_t> _scan_bytes_from_remote_storage;
     // number rows returned by query.
     // only set once by result sink when closing.
     int64_t returned_rows;
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 1d9bb34d09d..104a22fb8b9 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -225,28 +225,41 @@ void 
RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* blo
     int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
 
     // block's schema come from 
SchemaBackendActiveTasksScanner::_s_tbls_columns
+    // before 2.1.7, there are 12 columns in "backend_active_tasks" table.
+    // after 2.1.8, 2 new columns added.
+    // check this to make it compatible with version before 2.1.7
+    bool need_local_and_remote_bytes = (block->columns() > 12);
     for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
+        int col_idx = 0;
         TQueryStatistics tqs;
         qs_ctx_ptr->collect_query_statistics(&tqs);
-        SchemaScannerHelper::insert_int64_value(0, be_id, block);
-        SchemaScannerHelper::insert_string_value(1, 
qs_ctx_ptr->_fe_addr.hostname, block);
-        SchemaScannerHelper::insert_string_value(2, query_id, block);
+        SchemaScannerHelper::insert_int64_value(col_idx++, be_id, block);
+        SchemaScannerHelper::insert_string_value(col_idx++, 
qs_ctx_ptr->_fe_addr.hostname, block);
+        SchemaScannerHelper::insert_string_value(col_idx++, query_id, block);
 
         int64_t task_time = qs_ctx_ptr->_is_query_finished
                                     ? qs_ctx_ptr->_query_finish_time - 
qs_ctx_ptr->_query_start_time
                                     : MonotonicMillis() - 
qs_ctx_ptr->_query_start_time;
-        SchemaScannerHelper::insert_int64_value(3, task_time, block);
-        SchemaScannerHelper::insert_int64_value(4, tqs.cpu_ms, block);
-        SchemaScannerHelper::insert_int64_value(5, tqs.scan_rows, block);
-        SchemaScannerHelper::insert_int64_value(6, tqs.scan_bytes, block);
-        SchemaScannerHelper::insert_int64_value(7, tqs.max_peak_memory_bytes, 
block);
-        SchemaScannerHelper::insert_int64_value(8, 
tqs.current_used_memory_bytes, block);
-        SchemaScannerHelper::insert_int64_value(9, tqs.shuffle_send_bytes, 
block);
-        SchemaScannerHelper::insert_int64_value(10, tqs.shuffle_send_rows, 
block);
+        SchemaScannerHelper::insert_int64_value(col_idx++, task_time, block);
+        SchemaScannerHelper::insert_int64_value(col_idx++, tqs.cpu_ms, block);
+        SchemaScannerHelper::insert_int64_value(col_idx++, tqs.scan_rows, 
block);
+        SchemaScannerHelper::insert_int64_value(col_idx++, tqs.scan_bytes, 
block);
+
+        if (need_local_and_remote_bytes) {
+            SchemaScannerHelper::insert_int64_value(col_idx++, 
tqs.scan_bytes_from_local_storage,
+                                                    block);
+            SchemaScannerHelper::insert_int64_value(col_idx++, 
tqs.scan_bytes_from_remote_storage,
+                                                    block);
+        }
+
+        SchemaScannerHelper::insert_int64_value(col_idx++, 
tqs.max_peak_memory_bytes, block);
+        SchemaScannerHelper::insert_int64_value(col_idx++, 
tqs.current_used_memory_bytes, block);
+        SchemaScannerHelper::insert_int64_value(col_idx++, 
tqs.shuffle_send_bytes, block);
+        SchemaScannerHelper::insert_int64_value(col_idx++, 
tqs.shuffle_send_rows, block);
 
         std::stringstream ss;
         ss << qs_ctx_ptr->_query_type;
-        SchemaScannerHelper::insert_string_value(11, ss.str(), block);
+        SchemaScannerHelper::insert_string_value(col_idx++, ss.str(), block);
     }
 }
 
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index 0807f4949e5..b286b714ad9 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -667,7 +667,6 @@ public:
     io::FileReaderSPtr& get_inner_reader() { return _inner_reader; }
 
 protected:
-    void _collect_profile_at_runtime() override {};
     void _collect_profile_before_close() override;
 
 private:
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index aaf6fbdf3b4..22360200d79 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -656,7 +656,6 @@ void NewOlapScanner::_collect_profile_before_close() {
     COUNTER_UPDATE(Parent->_total_segment_counter, stats.total_segment_number);
 
     // Update counters for NewOlapScanner
-    // Update counters from tablet reader's stats
     auto& stats = _tablet_reader->stats();
 
     if (_parent) {
@@ -678,4 +677,19 @@ void NewOlapScanner::_collect_profile_before_close() {
     tablet->query_scan_count->increment(1);
 }
 
+void NewOlapScanner::_update_bytes_and_rows_read() {
+    VScanner::_update_bytes_and_rows_read();
+    if (_query_statistics) {
+        auto& stats = _tablet_reader->stats();
+        int64_t delta_local = stats.file_cache_stats.bytes_read_from_local - 
_bytes_read_from_local;
+        int64_t delta_remote =
+                stats.file_cache_stats.bytes_read_from_remote - 
_bytes_read_from_remote;
+        _query_statistics->add_scan_bytes_from_local_storage(delta_local);
+        _query_statistics->add_scan_bytes_from_remote_storage(delta_remote);
+        _query_statistics->add_scan_bytes(delta_local + delta_remote);
+        _bytes_read_from_local = stats.file_cache_stats.bytes_read_from_local;
+        _bytes_read_from_remote = 
stats.file_cache_stats.bytes_read_from_remote;
+    }
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h 
b/be/src/vec/exec/scan/new_olap_scanner.h
index cdadf8f7f49..90d871734c3 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -80,6 +80,7 @@ public:
 protected:
     Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
     void _collect_profile_before_close() override;
+    void _update_bytes_and_rows_read() override;
 
 private:
     void _update_realtime_counters();
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index e3899d96982..331b49b2082 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -1236,4 +1236,19 @@ void VFileScanner::_collect_profile_before_close() {
     }
 }
 
+void VFileScanner::_update_bytes_and_rows_read() {
+    VScanner::_update_bytes_and_rows_read();
+    if (_query_statistics && _io_ctx.get() && _io_ctx->file_cache_stats) {
+        int64_t delta_local =
+                _io_ctx->file_cache_stats->bytes_read_from_local - 
_bytes_read_from_local;
+        int64_t delta_remote =
+                _io_ctx->file_cache_stats->bytes_read_from_remote - 
_bytes_read_from_remote;
+        _query_statistics->add_scan_bytes_from_local_storage(delta_local);
+        _query_statistics->add_scan_bytes_from_remote_storage(delta_remote);
+        _query_statistics->add_scan_bytes(delta_local + delta_remote);
+        _bytes_read_from_local = 
_io_ctx->file_cache_stats->bytes_read_from_local;
+        _bytes_read_from_remote = 
_io_ctx->file_cache_stats->bytes_read_from_remote;
+    }
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.h 
b/be/src/vec/exec/scan/vfile_scanner.h
index cf1ea97f21b..1c6d903a87f 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -97,6 +97,8 @@ protected:
 
     void _collect_profile_before_close() override;
 
+    void _update_bytes_and_rows_read() override;
+
 protected:
     const TFileScanRangeParams* _params = nullptr;
     std::shared_ptr<vectorized::SplitSourceConnector> _split_source;
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 97a2ba8207a..58511e890d6 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -118,8 +118,7 @@ Status VScanner::get_block(RuntimeState* state, Block* 
block, bool* eof) {
         }
     }
 
-    int64_t old_scan_rows = _num_rows_read;
-    int64_t old_scan_bytes = _num_byte_read;
+    _prev_num_rows_read = _num_rows_read;
     {
         do {
             // if step 2 filter all rows of block, and block will be reused to 
get next rows,
@@ -138,7 +137,6 @@ Status VScanner::get_block(RuntimeState* state, Block* 
block, bool* eof) {
                     break;
                 }
                 _num_rows_read += block->rows();
-                _num_byte_read += block->allocated_bytes();
             }
 
             // 2. Filter the output block finally.
@@ -153,10 +151,7 @@ Status VScanner::get_block(RuntimeState* state, Block* 
block, bool* eof) {
                  _num_rows_read < rows_read_threshold);
     }
 
-    if (_query_statistics) {
-        _query_statistics->add_scan_rows(_num_rows_read - old_scan_rows);
-        _query_statistics->add_scan_bytes(_num_byte_read - old_scan_bytes);
-    }
+    _update_bytes_and_rows_read();
 
     if (state->is_cancelled()) {
         // TODO: Should return the specific ErrorStatus instead of just 
Cancelled.
@@ -281,7 +276,6 @@ void VScanner::_collect_profile_before_close() {
     if (_parent) {
         COUNTER_UPDATE(_parent->_scan_cpu_timer, _scan_cpu_timer);
         COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read);
-        COUNTER_UPDATE(_parent->_byte_read_counter, _num_byte_read);
     } else {
         COUNTER_UPDATE(_local_state->_scan_cpu_timer, _scan_cpu_timer);
         COUNTER_UPDATE(_local_state->_rows_read_counter, _num_rows_read);
@@ -301,4 +295,11 @@ void VScanner::update_scan_cpu_timer() {
     }
 }
 
+void VScanner::_update_bytes_and_rows_read() {
+    if (_query_statistics) {
+        _query_statistics->add_scan_rows(_num_rows_read - _prev_num_rows_read);
+        _prev_num_rows_read = _num_rows_read;
+    }
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index e85c6082ca6..03604621f05 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -135,6 +135,10 @@ public:
 
     void update_scan_cpu_timer();
 
+    // update the bytes and rows read at each round in query statistics.
+    // so that we can get runtime statistics for each query.
+    virtual void _update_bytes_and_rows_read();
+
     RuntimeState* runtime_state() { return _state; }
 
     bool is_open() { return _is_open; }
@@ -214,7 +218,12 @@ protected:
     // num of rows read from scanner
     int64_t _num_rows_read = 0;
 
-    int64_t _num_byte_read = 0;
+    // save the current _num_rows_read before next round,
+    // so that we can get delta rows between each round.
+    int64_t _prev_num_rows_read = 0;
+    // bytes read from local and remote fs
+    int64_t _bytes_read_from_local = 0;
+    int64_t _bytes_read_from_remote = 0;
 
     // num of rows return from scanner, after filter block
     int64_t _num_rows_return = 0;
diff --git a/be/test/io/cache/file_block_cache_test.cpp 
b/be/test/io/cache/file_block_cache_test.cpp
index 20f97aae6ad..092a343c1a5 100644
--- a/be/test/io/cache/file_block_cache_test.cpp
+++ b/be/test/io/cache/file_block_cache_test.cpp
@@ -852,7 +852,7 @@ TEST(LRUFileCache, fd_cache_remove) {
         assert_range(2, segments[0], io::FileBlock::Range(0, 8), 
io::FileBlock::State::DOWNLOADING);
         download(segments[0]);
         std::unique_ptr<char[]> buffer = std::make_unique<char[]>(9);
-        static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 9), 0));
+        static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 9), 0, 
nullptr));
         EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 
0)));
     }
     {
@@ -864,7 +864,7 @@ TEST(LRUFileCache, fd_cache_remove) {
         assert_range(2, segments[0], io::FileBlock::Range(9, 9), 
io::FileBlock::State::DOWNLOADING);
         download(segments[0]);
         std::unique_ptr<char[]> buffer = std::make_unique<char[]>(1);
-        static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 1), 0));
+        static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 1), 0, 
nullptr));
         EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 
9)));
     }
     {
@@ -877,7 +877,7 @@ TEST(LRUFileCache, fd_cache_remove) {
                      io::FileBlock::State::DOWNLOADING);
         download(segments[0]);
         std::unique_ptr<char[]> buffer = std::make_unique<char[]>(5);
-        static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 5), 0));
+        static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 5), 0, 
nullptr));
         EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 
10)));
     }
     {
@@ -890,7 +890,7 @@ TEST(LRUFileCache, fd_cache_remove) {
                      io::FileBlock::State::DOWNLOADING);
         download(segments[0]);
         std::unique_ptr<char[]> buffer = std::make_unique<char[]>(10);
-        static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 10), 0));
+        static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 10), 0, 
nullptr));
         EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 
15)));
     }
     EXPECT_FALSE(io::IFileCache::contains_file_reader(std::make_pair(key, 0)));
@@ -933,7 +933,7 @@ TEST(LRUFileCache, fd_cache_evict) {
         assert_range(2, segments[0], io::FileBlock::Range(0, 8), 
io::FileBlock::State::DOWNLOADING);
         download(segments[0]);
         std::unique_ptr<char[]> buffer = std::make_unique<char[]>(9);
-        static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 9), 0));
+        static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 9), 0, 
nullptr));
         EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 
0)));
     }
     {
@@ -945,7 +945,7 @@ TEST(LRUFileCache, fd_cache_evict) {
         assert_range(2, segments[0], io::FileBlock::Range(9, 9), 
io::FileBlock::State::DOWNLOADING);
         download(segments[0]);
         std::unique_ptr<char[]> buffer = std::make_unique<char[]>(1);
-        static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 1), 0));
+        static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 1), 0, 
nullptr));
         EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 
9)));
     }
     {
@@ -958,7 +958,7 @@ TEST(LRUFileCache, fd_cache_evict) {
                      io::FileBlock::State::DOWNLOADING);
         download(segments[0]);
         std::unique_ptr<char[]> buffer = std::make_unique<char[]>(5);
-        static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 5), 0));
+        static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 5), 0, 
nullptr));
         EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 
10)));
     }
     EXPECT_FALSE(io::IFileCache::contains_file_reader(std::make_pair(key, 0)));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
index afe1f9af2da..cf827efbbdb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
@@ -82,6 +82,12 @@ public class InternalSchema {
         AUDIT_SCHEMA.add(new ColumnDef("return_rows", 
TypeDef.create(PrimitiveType.BIGINT), true));
         AUDIT_SCHEMA.add(new ColumnDef("shuffle_send_rows", 
TypeDef.create(PrimitiveType.BIGINT), true));
         AUDIT_SCHEMA.add(new ColumnDef("shuffle_send_bytes", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        AUDIT_SCHEMA
+                .add(new ColumnDef("scan_bytes_from_local_storage", 
TypeDef.create(PrimitiveType.BIGINT),
+                        true));
+        AUDIT_SCHEMA
+                .add(new ColumnDef("scan_bytes_from_remote_storage", 
TypeDef.create(PrimitiveType.BIGINT),
+                        true));
         AUDIT_SCHEMA.add(new ColumnDef("stmt_id", 
TypeDef.create(PrimitiveType.BIGINT), true));
         AUDIT_SCHEMA.add(new ColumnDef("is_query", 
TypeDef.create(PrimitiveType.TINYINT), true));
         AUDIT_SCHEMA.add(new ColumnDef("is_nereids", 
TypeDef.create(PrimitiveType.TINYINT), true));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 785343d78cb..8f12300faea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -455,6 +455,8 @@ public class SchemaTable extends Table {
                                     .column("TASK_CPU_TIME_MS", 
ScalarType.createType(PrimitiveType.BIGINT))
                                     .column("SCAN_ROWS", 
ScalarType.createType(PrimitiveType.BIGINT))
                                     .column("SCAN_BYTES", 
ScalarType.createType(PrimitiveType.BIGINT))
+                                    .column("LOCAL_SCAN_BYTES", 
ScalarType.createType(PrimitiveType.BIGINT))
+                                    .column("REMOTE_SCAN_BYTES", 
ScalarType.createType(PrimitiveType.BIGINT))
                                     .column("BE_PEAK_MEMORY_BYTES", 
ScalarType.createType(PrimitiveType.BIGINT))
                                     .column("CURRENT_USED_MEMORY_BYTES", 
ScalarType.createType(PrimitiveType.BIGINT))
                                     .column("SHUFFLE_SEND_BYTES", 
ScalarType.createType(PrimitiveType.BIGINT))
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index 7d64b600d8a..5e9dd3f4d4c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -105,6 +105,10 @@ public class AuditEvent {
     // note: newly added fields should be always before fuzzyVariables
     @AuditField(value = "FuzzyVariables")
     public String fuzzyVariables = "";
+    @AuditField(value = "scanBytesFromLocalStorage")
+    public long scanBytesFromLocalStorage = -1;
+    @AuditField(value = "scanBytesFromRemoteStorage")
+    public long scanBytesFromRemoteStorage = -1;
 
     public long pushToAuditLogQueueTime;
 
@@ -249,6 +253,16 @@ public class AuditEvent {
             return this;
         }
 
+        public AuditEventBuilder setScanBytesFromLocalStorage(long 
scanBytesFromLocalStorage) {
+            auditEvent.scanBytesFromLocalStorage = scanBytesFromLocalStorage;
+            return this;
+        }
+
+        public AuditEventBuilder setScanBytesFromRemoteStorage(long 
scanBytesFromRemoteStorage) {
+            auditEvent.scanBytesFromRemoteStorage = scanBytesFromRemoteStorage;
+            return this;
+        }
+
         public AuditEvent build() {
             return this.auditEvent;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
index 55dbba9805e..6aac0364bc0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
@@ -151,6 +151,10 @@ public class AuditLoader extends Plugin implements 
AuditPlugin {
         logBuffer.append(event.queryTime).append("\t");
         logBuffer.append(event.scanBytes).append("\t");
         logBuffer.append(event.scanRows).append("\t");
+        logBuffer.append(event.shuffleSendBytes).append("\t");
+        logBuffer.append(event.shuffleSendRows).append("\t");
+        logBuffer.append(event.scanBytesFromLocalStorage).append("\t");
+        logBuffer.append(event.scanBytesFromRemoteStorage).append("\t");
         logBuffer.append(event.returnRows).append("\t");
         logBuffer.append(event.shuffleSendRows).append("\t");
         logBuffer.append(event.shuffleSendBytes).append("\t");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index 031a01b32d2..e73ddd7aa86 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -226,7 +226,11 @@ public class AuditLogHelper {
                     auditEventBuilder.setSqlDigest(sqlDigest);
                 }
             }
-            auditEventBuilder.setIsQuery(true);
+            auditEventBuilder.setIsQuery(true)
+                    .setScanBytesFromLocalStorage(
+                            statistics == null ? 0 : 
statistics.getScanBytesFromLocalStorage())
+                    .setScanBytesFromRemoteStorage(
+                            statistics == null ? 0 : 
statistics.getScanBytesFromRemoteStorage());
         } else {
             auditEventBuilder.setIsQuery(false);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index e27cb4e0df2..ce0703f23a8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -84,6 +84,8 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
                     auditEvent.cpuTimeMs = queryStats.cpu_ms;
                     auditEvent.shuffleSendBytes = 
queryStats.shuffle_send_bytes;
                     auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
+                    auditEvent.scanBytesFromLocalStorage = 
queryStats.scan_bytes_from_local_storage;
+                    auditEvent.scanBytesFromRemoteStorage = 
queryStats.scan_bytes_from_remote_storage;
                 }
                 boolean ret = 
Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent, true);
                 if (!ret) {
@@ -222,6 +224,8 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
         if (dst.max_peak_memory_bytes < src.max_peak_memory_bytes) {
             dst.max_peak_memory_bytes = src.max_peak_memory_bytes;
         }
+        dst.scan_bytes_from_local_storage += src.scan_bytes_from_local_storage;
+        dst.scan_bytes_from_remote_storage += 
src.scan_bytes_from_remote_storage;
     }
 
     private void queryAuditEventLogWriteLock() {
@@ -232,3 +236,4 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
         queryAuditEventLock.writeLock().unlock();
     }
 }
+
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index e9ced523912..755a3a042db 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -35,6 +35,8 @@ message PQueryStatistics {
     optional int64 cpu_ms = 4;
     optional int64 max_peak_memory_bytes = 5;
     repeated PNodeStatistics nodes_statistics = 6;
+    optional int64 scan_bytes_from_local_storage = 7;
+    optional int64 scan_bytes_from_remote_storage = 8;
 }
 
 message PRowBatch {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 4522fd08f68..638edf1a7bb 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -412,6 +412,8 @@ struct TQueryStatistics {
     7: optional i64 workload_group_id
     8: optional i64 shuffle_send_bytes
     9: optional i64 shuffle_send_rows
+    10: optional i64 scan_bytes_from_local_storage
+    11: optional i64 scan_bytes_from_remote_storage
 }
 
 struct TReportWorkloadRuntimeStatusParams {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to