This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 441125652c5 [Chore](statistic) do not use memory order relaxed on 
QueryStatistics and add sync on te… (#38048)
441125652c5 is described below

commit 441125652c56a1e9caac48240309464f765191e1
Author: Pxl <pxl...@qq.com>
AuthorDate: Mon Jul 22 10:56:56 2024 +0800

    [Chore](statistic) do not use memory order relaxed on QueryStatistics and 
add sync on te… (#38048)
    
    ## Proposed changes
    1. do not use memory order relaxed on QueryStatistics
    2. remove some unused code
    3. add sync to test dry_run
---
 be/src/runtime/query_statistics.cpp                | 75 +++++-------------
 be/src/runtime/query_statistics.h                  | 90 ++++------------------
 .../suites/query_p0/dry_run/dry_run.groovy         |  2 +
 3 files changed, 33 insertions(+), 134 deletions(-)

diff --git a/be/src/runtime/query_statistics.cpp 
b/be/src/runtime/query_statistics.cpp
index 126fb10af5b..110efef5ab9 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -27,22 +27,20 @@
 namespace doris {
 
 void QueryStatistics::merge(const QueryStatistics& other) {
-    scan_rows += other.scan_rows.load(std::memory_order_relaxed);
-    scan_bytes += other.scan_bytes.load(std::memory_order_relaxed);
-    cpu_nanos += other.cpu_nanos.load(std::memory_order_relaxed);
-    shuffle_send_bytes += 
other.shuffle_send_bytes.load(std::memory_order_relaxed);
-    shuffle_send_rows += 
other.shuffle_send_rows.load(std::memory_order_relaxed);
-    _scan_bytes_from_local_storage +=
-            
other._scan_bytes_from_local_storage.load(std::memory_order_relaxed);
-    _scan_bytes_from_remote_storage +=
-            
other._scan_bytes_from_remote_storage.load(std::memory_order_relaxed);
-
-    int64_t other_peak_mem = 
other.max_peak_memory_bytes.load(std::memory_order_relaxed);
+    scan_rows += other.scan_rows;
+    scan_bytes += other.scan_bytes;
+    cpu_nanos += other.cpu_nanos;
+    shuffle_send_bytes += other.shuffle_send_bytes;
+    shuffle_send_rows += other.shuffle_send_rows;
+    _scan_bytes_from_local_storage += other._scan_bytes_from_local_storage;
+    _scan_bytes_from_remote_storage += other._scan_bytes_from_remote_storage;
+
+    int64_t other_peak_mem = other.max_peak_memory_bytes;
     if (other_peak_mem > this->max_peak_memory_bytes) {
         this->max_peak_memory_bytes = other_peak_mem;
     }
 
-    int64_t other_memory_used = 
other.current_used_memory_bytes.load(std::memory_order_relaxed);
+    int64_t other_memory_used = other.current_used_memory_bytes;
     if (other_memory_used > 0) {
         this->current_used_memory_bytes = other_memory_used;
     }
@@ -61,15 +59,14 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
 
 void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
     DCHECK(statistics != nullptr);
-    statistics->__set_scan_bytes(scan_bytes.load(std::memory_order_relaxed));
-    statistics->__set_scan_rows(scan_rows.load(std::memory_order_relaxed));
-    statistics->__set_cpu_ms(cpu_nanos.load(std::memory_order_relaxed) / 
NANOS_PER_MILLIS);
-    
statistics->__set_returned_rows(returned_rows.load(std::memory_order_relaxed));
-    
statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes.load(std::memory_order_relaxed));
-    statistics->__set_current_used_memory_bytes(
-            current_used_memory_bytes.load(std::memory_order_relaxed));
-    
statistics->__set_shuffle_send_bytes(shuffle_send_bytes.load(std::memory_order_relaxed));
-    
statistics->__set_shuffle_send_rows(shuffle_send_rows.load(std::memory_order_relaxed));
+    statistics->__set_scan_bytes(scan_bytes);
+    statistics->__set_scan_rows(scan_rows);
+    statistics->__set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS);
+    statistics->__set_returned_rows(returned_rows);
+    statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes);
+    statistics->__set_current_used_memory_bytes(current_used_memory_bytes);
+    statistics->__set_shuffle_send_bytes(shuffle_send_bytes);
+    statistics->__set_shuffle_send_rows(shuffle_send_rows);
     
statistics->__set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
     
statistics->__set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
 }
@@ -82,42 +79,6 @@ void QueryStatistics::from_pb(const PQueryStatistics& 
statistics) {
     _scan_bytes_from_remote_storage = 
statistics.scan_bytes_from_remote_storage();
 }
 
-void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
-    recvr->merge(this);
-}
-
-void QueryStatistics::merge(QueryStatisticsRecvr* recvr, int sender_id) {
-    DCHECK(recvr != nullptr);
-    auto QueryStatisticsptr = recvr->find(sender_id);
-    if (QueryStatisticsptr) {
-        merge(*QueryStatisticsptr);
-    }
-}
-
 QueryStatistics::~QueryStatistics() {}
 
-void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int 
sender_id) {
-    std::lock_guard<std::mutex> l(_lock);
-    if (!_query_statistics.contains(sender_id)) {
-        _query_statistics[sender_id] = std::make_shared<QueryStatistics>();
-    }
-    _query_statistics[sender_id]->from_pb(statistics);
-}
-
-void QueryStatisticsRecvr::insert(QueryStatisticsPtr statistics, int 
sender_id) {
-    if (!statistics->collected()) return;
-    if (_query_statistics.contains(sender_id)) return;
-    std::lock_guard<std::mutex> l(_lock);
-    _query_statistics[sender_id] = statistics;
-}
-
-QueryStatisticsPtr QueryStatisticsRecvr::find(int sender_id) {
-    std::lock_guard<std::mutex> l(_lock);
-    auto it = _query_statistics.find(sender_id);
-    if (it != _query_statistics.end()) {
-        return it->second;
-    }
-    return nullptr;
-}
-
 } // namespace doris
diff --git a/be/src/runtime/query_statistics.h 
b/be/src/runtime/query_statistics.h
index e71a136789a..0a19dfd46f0 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -31,7 +31,6 @@
 
 namespace doris {
 
-class QueryStatisticsRecvr;
 class PNodeStatistics;
 class PQueryStatistics;
 
@@ -53,82 +52,44 @@ public:
 
     void merge(const QueryStatistics& other);
 
-    void add_scan_rows(int64_t delta_scan_rows) {
-        this->scan_rows.fetch_add(delta_scan_rows, std::memory_order_relaxed);
-    }
+    void add_scan_rows(int64_t delta_scan_rows) { scan_rows += 
delta_scan_rows; }
 
-    void add_scan_bytes(int64_t delta_scan_bytes) {
-        this->scan_bytes.fetch_add(delta_scan_bytes, 
std::memory_order_relaxed);
-    }
+    void add_scan_bytes(int64_t delta_scan_bytes) { scan_bytes += 
delta_scan_bytes; }
 
-    void add_cpu_nanos(int64_t delta_cpu_time) {
-        this->cpu_nanos.fetch_add(delta_cpu_time, std::memory_order_relaxed);
-    }
+    void add_cpu_nanos(int64_t delta_cpu_time) { cpu_nanos += delta_cpu_time; }
 
-    void add_shuffle_send_bytes(int64_t delta_bytes) {
-        this->shuffle_send_bytes.fetch_add(delta_bytes, 
std::memory_order_relaxed);
-    }
+    void add_shuffle_send_bytes(int64_t delta_bytes) { shuffle_send_bytes += 
delta_bytes; }
 
-    void add_shuffle_send_rows(int64_t delta_rows) {
-        this->shuffle_send_rows.fetch_add(delta_rows, 
std::memory_order_relaxed);
-    }
+    void add_shuffle_send_rows(int64_t delta_rows) { shuffle_send_rows += 
delta_rows; }
 
     void add_scan_bytes_from_local_storage(int64_t 
scan_bytes_from_local_storage) {
-        this->_scan_bytes_from_local_storage += scan_bytes_from_local_storage;
+        _scan_bytes_from_local_storage += scan_bytes_from_local_storage;
     }
 
     void add_scan_bytes_from_remote_storage(int64_t 
scan_bytes_from_remote_storage) {
-        this->_scan_bytes_from_remote_storage += 
scan_bytes_from_remote_storage;
+        _scan_bytes_from_remote_storage += scan_bytes_from_remote_storage;
     }
 
-    void add_returned_rows(int64_t num_rows) {
-        this->returned_rows.fetch_add(num_rows, std::memory_order_relaxed);
-    }
+    void add_returned_rows(int64_t num_rows) { returned_rows += num_rows; }
 
     void set_max_peak_memory_bytes(int64_t max_peak_memory_bytes) {
-        this->max_peak_memory_bytes.store(max_peak_memory_bytes, 
std::memory_order_relaxed);
+        this->max_peak_memory_bytes = max_peak_memory_bytes;
     }
 
     void set_current_used_memory_bytes(int64_t current_used_memory) {
-        this->current_used_memory_bytes.store(current_used_memory, 
std::memory_order_relaxed);
-    }
-
-    void merge(QueryStatisticsRecvr* recvr);
-
-    void merge(QueryStatisticsRecvr* recvr, int sender_id);
-
-    void clearNodeStatistics();
-
-    void clear() {
-        scan_rows.store(0, std::memory_order_relaxed);
-        scan_bytes.store(0, std::memory_order_relaxed);
-        cpu_nanos.store(0, std::memory_order_relaxed);
-        shuffle_send_bytes.store(0, std::memory_order_relaxed);
-        shuffle_send_rows.store(0, std::memory_order_relaxed);
-        _scan_bytes_from_local_storage.store(0);
-        _scan_bytes_from_remote_storage.store(0);
-
-        returned_rows.store(0, std::memory_order_relaxed);
-        max_peak_memory_bytes.store(0, std::memory_order_relaxed);
-        clearNodeStatistics();
-        //clear() is used before collection, so calling "clear" is equivalent 
to being collected.
-        set_collected();
+        current_used_memory_bytes = current_used_memory;
     }
 
     void to_pb(PQueryStatistics* statistics);
     void to_thrift(TQueryStatistics* statistics) const;
     void from_pb(const PQueryStatistics& statistics);
     bool collected() const { return _collected; }
-    void set_collected() { _collected = true; }
 
-    int64_t get_scan_rows() { return 
scan_rows.load(std::memory_order_relaxed); }
-    int64_t get_scan_bytes() { return 
scan_bytes.load(std::memory_order_relaxed); }
-    int64_t get_current_used_memory_bytes() {
-        return current_used_memory_bytes.load(std::memory_order_relaxed);
-    }
+    int64_t get_scan_rows() { return scan_rows; }
+    int64_t get_scan_bytes() { return scan_bytes; }
+    int64_t get_current_used_memory_bytes() { return 
current_used_memory_bytes; }
 
 private:
-    friend class QueryStatisticsRecvr;
     std::atomic<int64_t> scan_rows;
     std::atomic<int64_t> scan_bytes;
     std::atomic<int64_t> cpu_nanos;
@@ -148,30 +109,5 @@ private:
 };
 using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
 // It is used for collecting sub plan query statistics in DataStreamRecvr.
-class QueryStatisticsRecvr {
-public:
-    ~QueryStatisticsRecvr() = default;
-
-    // Transmitted via RPC, incurring serialization overhead.
-    void insert(const PQueryStatistics& statistics, int sender_id);
-
-    // using local_exchange for transmission, only need to hold a shared 
pointer.
-    void insert(QueryStatisticsPtr statistics, int sender_id);
-
-    QueryStatisticsPtr find(int sender_id);
-
-private:
-    friend class QueryStatistics;
-
-    void merge(QueryStatistics* statistics) {
-        std::lock_guard<std::mutex> l(_lock);
-        for (auto& pair : _query_statistics) {
-            statistics->merge(*(pair.second));
-        }
-    }
-
-    std::map<int, QueryStatisticsPtr> _query_statistics;
-    std::mutex _lock;
-};
 
 } // namespace doris
diff --git a/regression-test/suites/query_p0/dry_run/dry_run.groovy 
b/regression-test/suites/query_p0/dry_run/dry_run.groovy
index 19fc6e011d0..98a7d14d713 100644
--- a/regression-test/suites/query_p0/dry_run/dry_run.groovy
+++ b/regression-test/suites/query_p0/dry_run/dry_run.groovy
@@ -38,6 +38,8 @@ suite ("dry_run") {
     sql "insert into d_table select -4,-4,-4,'d';"
     sql "insert into d_table(k4,k2) values('d',4);"
 
+    sql "sync"
+
     sql "set dry_run_query=true;"
     qt_select_star "select * from d_table;"
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to