This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new da1cd324ae0 (cloud-merge) Add file cache stats for queries to audit log (#34140) da1cd324ae0 is described below commit da1cd324ae0d74b3241d5893a857a91bb578a70b Author: Lightman <31928846+lchangli...@users.noreply.github.com> AuthorDate: Sat Apr 27 13:54:41 2024 +0800 (cloud-merge) Add file cache stats for queries to audit log (#34140) --- be/src/runtime/query_statistics.cpp | 10 ++++++++++ be/src/runtime/query_statistics.h | 12 ++++++++++++ be/src/vec/exec/scan/new_olap_scanner.cpp | 6 ++++++ be/test/vec/olap/vertical_compaction_test.cpp | 4 ---- .../java/org/apache/doris/plugin/audit/AuditEvent.java | 14 ++++++++++++++ .../src/main/java/org/apache/doris/qe/AuditLogHelper.java | 6 +++++- .../workloadschedpolicy/WorkloadRuntimeStatusMgr.java | 4 ++++ gensrc/proto/data.proto | 2 ++ gensrc/thrift/FrontendService.thrift | 2 ++ 9 files changed, 55 insertions(+), 5 deletions(-) diff --git a/be/src/runtime/query_statistics.cpp b/be/src/runtime/query_statistics.cpp index de950704180..551f518f22f 100644 --- a/be/src/runtime/query_statistics.cpp +++ b/be/src/runtime/query_statistics.cpp @@ -32,6 +32,10 @@ void QueryStatistics::merge(const QueryStatistics& other) { cpu_nanos += other.cpu_nanos.load(std::memory_order_relaxed); shuffle_send_bytes += other.shuffle_send_bytes.load(std::memory_order_relaxed); shuffle_send_rows += other.shuffle_send_rows.load(std::memory_order_relaxed); + _scan_bytes_from_local_storage += + other._scan_bytes_from_local_storage.load(std::memory_order_relaxed); + _scan_bytes_from_remote_storage += + other._scan_bytes_from_remote_storage.load(std::memory_order_relaxed); int64_t other_peak_mem = other.max_peak_memory_bytes.load(std::memory_order_relaxed); if (other_peak_mem > this->max_peak_memory_bytes) { @@ -51,6 +55,8 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) { statistics->set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS); statistics->set_returned_rows(returned_rows); statistics->set_max_peak_memory_bytes(max_peak_memory_bytes); + statistics->set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage); + statistics->set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage); } void QueryStatistics::to_thrift(TQueryStatistics* statistics) const { @@ -64,12 +70,16 @@ void QueryStatistics::to_thrift(TQueryStatistics* statistics) const { current_used_memory_bytes.load(std::memory_order_relaxed)); statistics->__set_shuffle_send_bytes(shuffle_send_bytes.load(std::memory_order_relaxed)); statistics->__set_shuffle_send_rows(shuffle_send_rows.load(std::memory_order_relaxed)); + statistics->__set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage); + statistics->__set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage); } void QueryStatistics::from_pb(const PQueryStatistics& statistics) { scan_rows = statistics.scan_rows(); scan_bytes = statistics.scan_bytes(); cpu_nanos = statistics.cpu_ms() * NANOS_PER_MILLIS; + _scan_bytes_from_local_storage = statistics.scan_bytes_from_local_storage(); + _scan_bytes_from_remote_storage = statistics.scan_bytes_from_remote_storage(); } void QueryStatistics::merge(QueryStatisticsRecvr* recvr) { diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h index a9f6e192ec0..0a1c5c9f7ba 100644 --- a/be/src/runtime/query_statistics.h +++ b/be/src/runtime/query_statistics.h @@ -73,6 +73,14 @@ public: this->shuffle_send_rows.fetch_add(delta_rows, std::memory_order_relaxed); } + void add_scan_bytes_from_local_storage(int64_t scan_bytes_from_local_storage) { + this->_scan_bytes_from_local_storage += scan_bytes_from_local_storage; + } + + void add_scan_bytes_from_remote_storage(int64_t scan_bytes_from_remote_storage) { + this->_scan_bytes_from_remote_storage += scan_bytes_from_remote_storage; + } + void set_returned_rows(int64_t num_rows) { this->returned_rows = num_rows; } void set_max_peak_memory_bytes(int64_t max_peak_memory_bytes) { @@ -95,6 +103,8 @@ public: cpu_nanos.store(0, std::memory_order_relaxed); shuffle_send_bytes.store(0, std::memory_order_relaxed); shuffle_send_rows.store(0, std::memory_order_relaxed); + _scan_bytes_from_local_storage.store(0); + _scan_bytes_from_remote_storage.store(0); returned_rows = 0; max_peak_memory_bytes.store(0, std::memory_order_relaxed); @@ -120,6 +130,8 @@ private: std::atomic<int64_t> scan_rows; std::atomic<int64_t> scan_bytes; std::atomic<int64_t> cpu_nanos; + std::atomic<int64_t> _scan_bytes_from_local_storage; + std::atomic<int64_t> _scan_bytes_from_remote_storage; // number rows returned by query. // only set once by result sink when closing. int64_t returned_rows; diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index c0bef6b3d8a..f65f814f569 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -688,6 +688,12 @@ void NewOlapScanner::_collect_profile_before_close() { tablet->query_scan_bytes->increment(_compressed_bytes_read); tablet->query_scan_rows->increment(_raw_rows_read); tablet->query_scan_count->increment(1); + if (_query_statistics) { + _query_statistics->add_scan_bytes_from_local_storage( + stats.file_cache_stats.bytes_read_from_local); + _query_statistics->add_scan_bytes_from_remote_storage( + stats.file_cache_stats.bytes_read_from_remote); + } } } // namespace doris::vectorized diff --git a/be/test/vec/olap/vertical_compaction_test.cpp b/be/test/vec/olap/vertical_compaction_test.cpp index a98f9c2944c..a4feb3db535 100644 --- a/be/test/vec/olap/vertical_compaction_test.cpp +++ b/be/test/vec/olap/vertical_compaction_test.cpp @@ -97,10 +97,6 @@ protected: auto engine = std::make_unique<StorageEngine>(options); engine_ref = engine.get(); ExecEnv::GetInstance()->set_storage_engine(std::move(engine)); - io::FileCacheSettings cache_setting; - ASSERT_TRUE(io::FileCacheFactory::instance() - ->create_file_cache(absolute_dir + "/tablet_path", cache_setting) - .ok()); _data_dir = new DataDir(*engine_ref, absolute_dir, 100000000); static_cast<void>(_data_dir->init()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java index 6a5fe19fcc6..ac68b38e258 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java @@ -107,6 +107,10 @@ public class AuditEvent { // note: newly added fields should be always before fuzzyVariables @AuditField(value = "FuzzyVariables") public String fuzzyVariables = ""; + @AuditField(value = "scanBytesFromLocalStorage") + public long scanBytesFromLocalStorage = -1; + @AuditField(value = "scanBytesFromRemoteStorage") + public long scanBytesFromRemoteStorage = -1; public long pushToAuditLogQueueTime; @@ -251,6 +255,16 @@ public class AuditEvent { return this; } + public AuditEventBuilder setScanBytesFromLocalStorage(long scanBytesFromLocalStorage) { + auditEvent.scanBytesFromLocalStorage = scanBytesFromLocalStorage; + return this; + } + + public AuditEventBuilder setScanBytesFromRemoteStorage(long scanBytesFromRemoteStorage) { + auditEvent.scanBytesFromRemoteStorage = scanBytesFromRemoteStorage; + return this; + } + public AuditEvent build() { return this.auditEvent; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index 17a0abe3468..edad7780512 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -111,7 +111,11 @@ public class AuditLogHelper { auditEventBuilder.setSqlDigest(sqlDigest); } } - auditEventBuilder.setIsQuery(true); + auditEventBuilder.setIsQuery(true) + .setScanBytesFromLocalStorage( + statistics == null ? 0 : statistics.getScanBytesFromLocalStorage()) + .setScanBytesFromRemoteStorage( + statistics == null ? 0 : statistics.getScanBytesFromRemoteStorage()); } else { auditEventBuilder.setIsQuery(false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java index ff2641d5f3c..983643ec49b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java @@ -68,6 +68,8 @@ public class WorkloadRuntimeStatusMgr { if (queryStats != null) { auditEvent.scanRows = queryStats.scan_rows; auditEvent.scanBytes = queryStats.scan_bytes; + auditEvent.scanBytesFromLocalStorage = queryStats.scan_bytes_from_local_storage; + auditEvent.scanBytesFromRemoteStorage = queryStats.scan_bytes_from_remote_storage; auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes; auditEvent.cpuTimeMs = queryStats.cpu_ms; auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes; @@ -176,6 +178,8 @@ public class WorkloadRuntimeStatusMgr { private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics src) { dst.scan_rows += src.scan_rows; dst.scan_bytes += src.scan_bytes; + dst.scan_bytes_from_local_storage += src.scan_bytes_from_local_storage; + dst.scan_bytes_from_remote_storage += src.scan_bytes_from_remote_storage; dst.cpu_ms += src.cpu_ms; dst.shuffle_send_bytes += src.shuffle_send_bytes; dst.shuffle_send_rows += src.shuffle_send_rows; diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto index e9ced523912..755a3a042db 100644 --- a/gensrc/proto/data.proto +++ b/gensrc/proto/data.proto @@ -35,6 +35,8 @@ message PQueryStatistics { optional int64 cpu_ms = 4; optional int64 max_peak_memory_bytes = 5; repeated PNodeStatistics nodes_statistics = 6; + optional int64 scan_bytes_from_local_storage = 7; + optional int64 scan_bytes_from_remote_storage = 8; } message PRowBatch { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 6ed7c23ec3c..23cc6c19e7b 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -409,6 +409,8 @@ struct TQueryStatistics { 7: optional i64 workload_group_id 8: optional i64 shuffle_send_bytes 9: optional i64 shuffle_send_rows + 10: optional i64 scan_bytes_from_local_storage + 11: optional i64 scan_bytes_from_remote_storage } struct TReportWorkloadRuntimeStatusParams { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org