This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new da1cd324ae0 (cloud-merge) Add file cache stats for queries to audit 
log (#34140)
da1cd324ae0 is described below

commit da1cd324ae0d74b3241d5893a857a91bb578a70b
Author: Lightman <31928846+lchangli...@users.noreply.github.com>
AuthorDate: Sat Apr 27 13:54:41 2024 +0800

    (cloud-merge) Add file cache stats for queries to audit log (#34140)
---
 be/src/runtime/query_statistics.cpp                        | 10 ++++++++++
 be/src/runtime/query_statistics.h                          | 12 ++++++++++++
 be/src/vec/exec/scan/new_olap_scanner.cpp                  |  6 ++++++
 be/test/vec/olap/vertical_compaction_test.cpp              |  4 ----
 .../java/org/apache/doris/plugin/audit/AuditEvent.java     | 14 ++++++++++++++
 .../src/main/java/org/apache/doris/qe/AuditLogHelper.java  |  6 +++++-
 .../workloadschedpolicy/WorkloadRuntimeStatusMgr.java      |  4 ++++
 gensrc/proto/data.proto                                    |  2 ++
 gensrc/thrift/FrontendService.thrift                       |  2 ++
 9 files changed, 55 insertions(+), 5 deletions(-)

diff --git a/be/src/runtime/query_statistics.cpp 
b/be/src/runtime/query_statistics.cpp
index de950704180..551f518f22f 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -32,6 +32,10 @@ void QueryStatistics::merge(const QueryStatistics& other) {
     cpu_nanos += other.cpu_nanos.load(std::memory_order_relaxed);
     shuffle_send_bytes += 
other.shuffle_send_bytes.load(std::memory_order_relaxed);
     shuffle_send_rows += 
other.shuffle_send_rows.load(std::memory_order_relaxed);
+    _scan_bytes_from_local_storage +=
+            
other._scan_bytes_from_local_storage.load(std::memory_order_relaxed);
+    _scan_bytes_from_remote_storage +=
+            
other._scan_bytes_from_remote_storage.load(std::memory_order_relaxed);
 
     int64_t other_peak_mem = 
other.max_peak_memory_bytes.load(std::memory_order_relaxed);
     if (other_peak_mem > this->max_peak_memory_bytes) {
@@ -51,6 +55,8 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
     statistics->set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS);
     statistics->set_returned_rows(returned_rows);
     statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
+    
statistics->set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
+    
statistics->set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
 }
 
 void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
@@ -64,12 +70,16 @@ void QueryStatistics::to_thrift(TQueryStatistics* 
statistics) const {
             current_used_memory_bytes.load(std::memory_order_relaxed));
     
statistics->__set_shuffle_send_bytes(shuffle_send_bytes.load(std::memory_order_relaxed));
     
statistics->__set_shuffle_send_rows(shuffle_send_rows.load(std::memory_order_relaxed));
+    
statistics->__set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
+    
statistics->__set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
 }
 
 void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
     scan_rows = statistics.scan_rows();
     scan_bytes = statistics.scan_bytes();
     cpu_nanos = statistics.cpu_ms() * NANOS_PER_MILLIS;
+    _scan_bytes_from_local_storage = 
statistics.scan_bytes_from_local_storage();
+    _scan_bytes_from_remote_storage = 
statistics.scan_bytes_from_remote_storage();
 }
 
 void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
diff --git a/be/src/runtime/query_statistics.h 
b/be/src/runtime/query_statistics.h
index a9f6e192ec0..0a1c5c9f7ba 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -73,6 +73,14 @@ public:
         this->shuffle_send_rows.fetch_add(delta_rows, 
std::memory_order_relaxed);
     }
 
+    void add_scan_bytes_from_local_storage(int64_t 
scan_bytes_from_local_storage) {
+        this->_scan_bytes_from_local_storage += scan_bytes_from_local_storage;
+    }
+
+    void add_scan_bytes_from_remote_storage(int64_t 
scan_bytes_from_remote_storage) {
+        this->_scan_bytes_from_remote_storage += 
scan_bytes_from_remote_storage;
+    }
+
     void set_returned_rows(int64_t num_rows) { this->returned_rows = num_rows; 
}
 
     void set_max_peak_memory_bytes(int64_t max_peak_memory_bytes) {
@@ -95,6 +103,8 @@ public:
         cpu_nanos.store(0, std::memory_order_relaxed);
         shuffle_send_bytes.store(0, std::memory_order_relaxed);
         shuffle_send_rows.store(0, std::memory_order_relaxed);
+        _scan_bytes_from_local_storage.store(0);
+        _scan_bytes_from_remote_storage.store(0);
 
         returned_rows = 0;
         max_peak_memory_bytes.store(0, std::memory_order_relaxed);
@@ -120,6 +130,8 @@ private:
     std::atomic<int64_t> scan_rows;
     std::atomic<int64_t> scan_bytes;
     std::atomic<int64_t> cpu_nanos;
+    std::atomic<int64_t> _scan_bytes_from_local_storage;
+    std::atomic<int64_t> _scan_bytes_from_remote_storage;
     // number rows returned by query.
     // only set once by result sink when closing.
     int64_t returned_rows;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index c0bef6b3d8a..f65f814f569 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -688,6 +688,12 @@ void NewOlapScanner::_collect_profile_before_close() {
     tablet->query_scan_bytes->increment(_compressed_bytes_read);
     tablet->query_scan_rows->increment(_raw_rows_read);
     tablet->query_scan_count->increment(1);
+    if (_query_statistics) {
+        _query_statistics->add_scan_bytes_from_local_storage(
+                stats.file_cache_stats.bytes_read_from_local);
+        _query_statistics->add_scan_bytes_from_remote_storage(
+                stats.file_cache_stats.bytes_read_from_remote);
+    }
 }
 
 } // namespace doris::vectorized
diff --git a/be/test/vec/olap/vertical_compaction_test.cpp 
b/be/test/vec/olap/vertical_compaction_test.cpp
index a98f9c2944c..a4feb3db535 100644
--- a/be/test/vec/olap/vertical_compaction_test.cpp
+++ b/be/test/vec/olap/vertical_compaction_test.cpp
@@ -97,10 +97,6 @@ protected:
         auto engine = std::make_unique<StorageEngine>(options);
         engine_ref = engine.get();
         ExecEnv::GetInstance()->set_storage_engine(std::move(engine));
-        io::FileCacheSettings cache_setting;
-        ASSERT_TRUE(io::FileCacheFactory::instance()
-                            ->create_file_cache(absolute_dir + "/tablet_path", 
cache_setting)
-                            .ok());
 
         _data_dir = new DataDir(*engine_ref, absolute_dir, 100000000);
         static_cast<void>(_data_dir->init());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
index 6a5fe19fcc6..ac68b38e258 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
@@ -107,6 +107,10 @@ public class AuditEvent {
     // note: newly added fields should be always before fuzzyVariables
     @AuditField(value = "FuzzyVariables")
     public String fuzzyVariables = "";
+    @AuditField(value = "scanBytesFromLocalStorage")
+    public long scanBytesFromLocalStorage = -1;
+    @AuditField(value = "scanBytesFromRemoteStorage")
+    public long scanBytesFromRemoteStorage = -1;
 
     public long pushToAuditLogQueueTime;
 
@@ -251,6 +255,16 @@ public class AuditEvent {
             return this;
         }
 
+        public AuditEventBuilder setScanBytesFromLocalStorage(long 
scanBytesFromLocalStorage) {
+            auditEvent.scanBytesFromLocalStorage = scanBytesFromLocalStorage;
+            return this;
+        }
+
+        public AuditEventBuilder setScanBytesFromRemoteStorage(long 
scanBytesFromRemoteStorage) {
+            auditEvent.scanBytesFromRemoteStorage = scanBytesFromRemoteStorage;
+            return this;
+        }
+
         public AuditEvent build() {
             return this.auditEvent;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index 17a0abe3468..edad7780512 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -111,7 +111,11 @@ public class AuditLogHelper {
                     auditEventBuilder.setSqlDigest(sqlDigest);
                 }
             }
-            auditEventBuilder.setIsQuery(true);
+            auditEventBuilder.setIsQuery(true)
+                    .setScanBytesFromLocalStorage(
+                            statistics == null ? 0 : 
statistics.getScanBytesFromLocalStorage())
+                    .setScanBytesFromRemoteStorage(
+                            statistics == null ? 0 : 
statistics.getScanBytesFromRemoteStorage());
         } else {
             auditEventBuilder.setIsQuery(false);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index ff2641d5f3c..983643ec49b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -68,6 +68,8 @@ public class WorkloadRuntimeStatusMgr {
                 if (queryStats != null) {
                     auditEvent.scanRows = queryStats.scan_rows;
                     auditEvent.scanBytes = queryStats.scan_bytes;
+                    auditEvent.scanBytesFromLocalStorage = 
queryStats.scan_bytes_from_local_storage;
+                    auditEvent.scanBytesFromRemoteStorage = 
queryStats.scan_bytes_from_remote_storage;
                     auditEvent.peakMemoryBytes = 
queryStats.max_peak_memory_bytes;
                     auditEvent.cpuTimeMs = queryStats.cpu_ms;
                     auditEvent.shuffleSendBytes = 
queryStats.shuffle_send_bytes;
@@ -176,6 +178,8 @@ public class WorkloadRuntimeStatusMgr {
     private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics 
src) {
         dst.scan_rows += src.scan_rows;
         dst.scan_bytes += src.scan_bytes;
+        dst.scan_bytes_from_local_storage += src.scan_bytes_from_local_storage;
+        dst.scan_bytes_from_remote_storage += 
src.scan_bytes_from_remote_storage;
         dst.cpu_ms += src.cpu_ms;
         dst.shuffle_send_bytes += src.shuffle_send_bytes;
         dst.shuffle_send_rows += src.shuffle_send_rows;
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index e9ced523912..755a3a042db 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -35,6 +35,8 @@ message PQueryStatistics {
     optional int64 cpu_ms = 4;
     optional int64 max_peak_memory_bytes = 5;
     repeated PNodeStatistics nodes_statistics = 6;
+    optional int64 scan_bytes_from_local_storage = 7;
+    optional int64 scan_bytes_from_remote_storage = 8;
 }
 
 message PRowBatch {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 6ed7c23ec3c..23cc6c19e7b 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -409,6 +409,8 @@ struct TQueryStatistics {
     7: optional i64 workload_group_id
     8: optional i64 shuffle_send_bytes
     9: optional i64 shuffle_send_rows
+    10: optional i64 scan_bytes_from_local_storage
+    11: optional i64 scan_bytes_from_remote_storage
 }
 
 struct TReportWorkloadRuntimeStatusParams {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to