This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 441125652c5 [Chore](statistic) do not use memory order relaxed on QueryStatistics and add sync on te… (#38048) 441125652c5 is described below commit 441125652c56a1e9caac48240309464f765191e1 Author: Pxl <pxl...@qq.com> AuthorDate: Mon Jul 22 10:56:56 2024 +0800 [Chore](statistic) do not use memory order relaxed on QueryStatistics and add sync on te… (#38048) ## Proposed changes 1. do not use memory order relaxed on QueryStatistics 2. remove some unused code 3. add sync to test dry_run --- be/src/runtime/query_statistics.cpp | 75 +++++------------- be/src/runtime/query_statistics.h | 90 ++++------------------ .../suites/query_p0/dry_run/dry_run.groovy | 2 + 3 files changed, 33 insertions(+), 134 deletions(-) diff --git a/be/src/runtime/query_statistics.cpp b/be/src/runtime/query_statistics.cpp index 126fb10af5b..110efef5ab9 100644 --- a/be/src/runtime/query_statistics.cpp +++ b/be/src/runtime/query_statistics.cpp @@ -27,22 +27,20 @@ namespace doris { void QueryStatistics::merge(const QueryStatistics& other) { - scan_rows += other.scan_rows.load(std::memory_order_relaxed); - scan_bytes += other.scan_bytes.load(std::memory_order_relaxed); - cpu_nanos += other.cpu_nanos.load(std::memory_order_relaxed); - shuffle_send_bytes += other.shuffle_send_bytes.load(std::memory_order_relaxed); - shuffle_send_rows += other.shuffle_send_rows.load(std::memory_order_relaxed); - _scan_bytes_from_local_storage += - other._scan_bytes_from_local_storage.load(std::memory_order_relaxed); - _scan_bytes_from_remote_storage += - other._scan_bytes_from_remote_storage.load(std::memory_order_relaxed); - - int64_t other_peak_mem = other.max_peak_memory_bytes.load(std::memory_order_relaxed); + scan_rows += other.scan_rows; + scan_bytes += other.scan_bytes; + cpu_nanos += other.cpu_nanos; + shuffle_send_bytes += other.shuffle_send_bytes; + shuffle_send_rows += other.shuffle_send_rows; + _scan_bytes_from_local_storage += other._scan_bytes_from_local_storage; + _scan_bytes_from_remote_storage += other._scan_bytes_from_remote_storage; + + int64_t other_peak_mem = other.max_peak_memory_bytes; if (other_peak_mem > this->max_peak_memory_bytes) { this->max_peak_memory_bytes = other_peak_mem; } - int64_t other_memory_used = other.current_used_memory_bytes.load(std::memory_order_relaxed); + int64_t other_memory_used = other.current_used_memory_bytes; if (other_memory_used > 0) { this->current_used_memory_bytes = other_memory_used; } @@ -61,15 +59,14 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) { void QueryStatistics::to_thrift(TQueryStatistics* statistics) const { DCHECK(statistics != nullptr); - statistics->__set_scan_bytes(scan_bytes.load(std::memory_order_relaxed)); - statistics->__set_scan_rows(scan_rows.load(std::memory_order_relaxed)); - statistics->__set_cpu_ms(cpu_nanos.load(std::memory_order_relaxed) / NANOS_PER_MILLIS); - statistics->__set_returned_rows(returned_rows.load(std::memory_order_relaxed)); - statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes.load(std::memory_order_relaxed)); - statistics->__set_current_used_memory_bytes( - current_used_memory_bytes.load(std::memory_order_relaxed)); - statistics->__set_shuffle_send_bytes(shuffle_send_bytes.load(std::memory_order_relaxed)); - statistics->__set_shuffle_send_rows(shuffle_send_rows.load(std::memory_order_relaxed)); + statistics->__set_scan_bytes(scan_bytes); + statistics->__set_scan_rows(scan_rows); + statistics->__set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS); + statistics->__set_returned_rows(returned_rows); + statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes); + statistics->__set_current_used_memory_bytes(current_used_memory_bytes); + statistics->__set_shuffle_send_bytes(shuffle_send_bytes); + statistics->__set_shuffle_send_rows(shuffle_send_rows); statistics->__set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage); statistics->__set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage); } @@ -82,42 +79,6 @@ void QueryStatistics::from_pb(const PQueryStatistics& statistics) { _scan_bytes_from_remote_storage = statistics.scan_bytes_from_remote_storage(); } -void QueryStatistics::merge(QueryStatisticsRecvr* recvr) { - recvr->merge(this); -} - -void QueryStatistics::merge(QueryStatisticsRecvr* recvr, int sender_id) { - DCHECK(recvr != nullptr); - auto QueryStatisticsptr = recvr->find(sender_id); - if (QueryStatisticsptr) { - merge(*QueryStatisticsptr); - } -} - QueryStatistics::~QueryStatistics() {} -void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int sender_id) { - std::lock_guard<std::mutex> l(_lock); - if (!_query_statistics.contains(sender_id)) { - _query_statistics[sender_id] = std::make_shared<QueryStatistics>(); - } - _query_statistics[sender_id]->from_pb(statistics); -} - -void QueryStatisticsRecvr::insert(QueryStatisticsPtr statistics, int sender_id) { - if (!statistics->collected()) return; - if (_query_statistics.contains(sender_id)) return; - std::lock_guard<std::mutex> l(_lock); - _query_statistics[sender_id] = statistics; -} - -QueryStatisticsPtr QueryStatisticsRecvr::find(int sender_id) { - std::lock_guard<std::mutex> l(_lock); - auto it = _query_statistics.find(sender_id); - if (it != _query_statistics.end()) { - return it->second; - } - return nullptr; -} - } // namespace doris diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h index e71a136789a..0a19dfd46f0 100644 --- a/be/src/runtime/query_statistics.h +++ b/be/src/runtime/query_statistics.h @@ -31,7 +31,6 @@ namespace doris { -class QueryStatisticsRecvr; class PNodeStatistics; class PQueryStatistics; @@ -53,82 +52,44 @@ public: void merge(const QueryStatistics& other); - void add_scan_rows(int64_t delta_scan_rows) { - this->scan_rows.fetch_add(delta_scan_rows, std::memory_order_relaxed); - } + void add_scan_rows(int64_t delta_scan_rows) { scan_rows += delta_scan_rows; } - void add_scan_bytes(int64_t delta_scan_bytes) { - this->scan_bytes.fetch_add(delta_scan_bytes, std::memory_order_relaxed); - } + void add_scan_bytes(int64_t delta_scan_bytes) { scan_bytes += delta_scan_bytes; } - void add_cpu_nanos(int64_t delta_cpu_time) { - this->cpu_nanos.fetch_add(delta_cpu_time, std::memory_order_relaxed); - } + void add_cpu_nanos(int64_t delta_cpu_time) { cpu_nanos += delta_cpu_time; } - void add_shuffle_send_bytes(int64_t delta_bytes) { - this->shuffle_send_bytes.fetch_add(delta_bytes, std::memory_order_relaxed); - } + void add_shuffle_send_bytes(int64_t delta_bytes) { shuffle_send_bytes += delta_bytes; } - void add_shuffle_send_rows(int64_t delta_rows) { - this->shuffle_send_rows.fetch_add(delta_rows, std::memory_order_relaxed); - } + void add_shuffle_send_rows(int64_t delta_rows) { shuffle_send_rows += delta_rows; } void add_scan_bytes_from_local_storage(int64_t scan_bytes_from_local_storage) { - this->_scan_bytes_from_local_storage += scan_bytes_from_local_storage; + _scan_bytes_from_local_storage += scan_bytes_from_local_storage; } void add_scan_bytes_from_remote_storage(int64_t scan_bytes_from_remote_storage) { - this->_scan_bytes_from_remote_storage += scan_bytes_from_remote_storage; + _scan_bytes_from_remote_storage += scan_bytes_from_remote_storage; } - void add_returned_rows(int64_t num_rows) { - this->returned_rows.fetch_add(num_rows, std::memory_order_relaxed); - } + void add_returned_rows(int64_t num_rows) { returned_rows += num_rows; } void set_max_peak_memory_bytes(int64_t max_peak_memory_bytes) { - this->max_peak_memory_bytes.store(max_peak_memory_bytes, std::memory_order_relaxed); + this->max_peak_memory_bytes = max_peak_memory_bytes; } void set_current_used_memory_bytes(int64_t current_used_memory) { - this->current_used_memory_bytes.store(current_used_memory, std::memory_order_relaxed); - } - - void merge(QueryStatisticsRecvr* recvr); - - void merge(QueryStatisticsRecvr* recvr, int sender_id); - - void clearNodeStatistics(); - - void clear() { - scan_rows.store(0, std::memory_order_relaxed); - scan_bytes.store(0, std::memory_order_relaxed); - cpu_nanos.store(0, std::memory_order_relaxed); - shuffle_send_bytes.store(0, std::memory_order_relaxed); - shuffle_send_rows.store(0, std::memory_order_relaxed); - _scan_bytes_from_local_storage.store(0); - _scan_bytes_from_remote_storage.store(0); - - returned_rows.store(0, std::memory_order_relaxed); - max_peak_memory_bytes.store(0, std::memory_order_relaxed); - clearNodeStatistics(); - //clear() is used before collection, so calling "clear" is equivalent to being collected. - set_collected(); + current_used_memory_bytes = current_used_memory; } void to_pb(PQueryStatistics* statistics); void to_thrift(TQueryStatistics* statistics) const; void from_pb(const PQueryStatistics& statistics); bool collected() const { return _collected; } - void set_collected() { _collected = true; } - int64_t get_scan_rows() { return scan_rows.load(std::memory_order_relaxed); } - int64_t get_scan_bytes() { return scan_bytes.load(std::memory_order_relaxed); } - int64_t get_current_used_memory_bytes() { - return current_used_memory_bytes.load(std::memory_order_relaxed); - } + int64_t get_scan_rows() { return scan_rows; } + int64_t get_scan_bytes() { return scan_bytes; } + int64_t get_current_used_memory_bytes() { return current_used_memory_bytes; } private: - friend class QueryStatisticsRecvr; std::atomic<int64_t> scan_rows; std::atomic<int64_t> scan_bytes; std::atomic<int64_t> cpu_nanos; @@ -148,30 +109,5 @@ private: }; using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>; // It is used for collecting sub plan query statistics in DataStreamRecvr. -class QueryStatisticsRecvr { -public: - ~QueryStatisticsRecvr() = default; - - // Transmitted via RPC, incurring serialization overhead. - void insert(const PQueryStatistics& statistics, int sender_id); - - // using local_exchange for transmission, only need to hold a shared pointer. - void insert(QueryStatisticsPtr statistics, int sender_id); - - QueryStatisticsPtr find(int sender_id); - -private: - friend class QueryStatistics; - - void merge(QueryStatistics* statistics) { - std::lock_guard<std::mutex> l(_lock); - for (auto& pair : _query_statistics) { - statistics->merge(*(pair.second)); - } - } - - std::map<int, QueryStatisticsPtr> _query_statistics; - std::mutex _lock; -}; } // namespace doris diff --git a/regression-test/suites/query_p0/dry_run/dry_run.groovy b/regression-test/suites/query_p0/dry_run/dry_run.groovy index 19fc6e011d0..98a7d14d713 100644 --- a/regression-test/suites/query_p0/dry_run/dry_run.groovy +++ b/regression-test/suites/query_p0/dry_run/dry_run.groovy @@ -38,6 +38,8 @@ suite ("dry_run") { sql "insert into d_table select -4,-4,-4,'d';" sql "insert into d_table(k4,k2) values('d',4);" + sql "sync" + sql "set dry_run_query=true;" qt_select_star "select * from d_table;" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org