This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fdd98d7eceb [Feature](profile)Support report runtime workload 
statistics #29591
fdd98d7eceb is described below

commit fdd98d7ecebd35f58cd32bb301c3d1b6eb73e3c7
Author: wangbo <[email protected]>
AuthorDate: Thu Jan 11 09:43:43 2024 +0800

    [Feature](profile)Support report runtime workload statistics #29591
---
 be/src/common/config.cpp                           |   2 +
 be/src/common/config.h                             |   2 +
 be/src/common/daemon.cpp                           |  13 ++
 be/src/common/daemon.h                             |   1 +
 be/src/exec/data_sink.cpp                          |  32 +--
 be/src/exec/data_sink.h                            |   8 -
 be/src/exec/exec_node.cpp                          |  21 +-
 be/src/exec/exec_node.h                            |  11 +-
 be/src/pipeline/exec/exchange_sink_buffer.cpp      |   8 -
 be/src/pipeline/exec/exchange_sink_buffer.h        |   1 -
 be/src/pipeline/exec/exchange_sink_operator.cpp    |   9 +-
 be/src/pipeline/exec/exchange_sink_operator.h      |   4 +-
 be/src/pipeline/exec/exchange_source_operator.cpp  |   3 +-
 be/src/pipeline/exec/exchange_source_operator.h    |   5 -
 be/src/pipeline/exec/operator.h                    |  21 --
 be/src/pipeline/exec/result_file_sink_operator.cpp |  13 +-
 be/src/pipeline/exec/result_file_sink_operator.h   |   2 -
 be/src/pipeline/exec/result_sink_operator.cpp      |   3 +-
 be/src/pipeline/exec/scan_operator.cpp             |   3 +
 be/src/pipeline/pipeline.h                         |   7 -
 be/src/pipeline/pipeline_fragment_context.cpp      |   5 +-
 be/src/pipeline/pipeline_fragment_context.h        |  12 --
 be/src/pipeline/pipeline_task.cpp                  |  26 ---
 be/src/pipeline/pipeline_task.h                    |   3 -
 be/src/pipeline/pipeline_x/operator.cpp            |   4 +-
 be/src/pipeline/pipeline_x/operator.h              |   7 +-
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  29 +--
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     |   5 +-
 be/src/pipeline/pipeline_x/pipeline_x_task.h       |   3 +-
 be/src/runtime/buffer_control_block.cpp            |   4 +-
 be/src/runtime/buffer_control_block.h              |  19 +-
 be/src/runtime/exec_env.h                          |   6 +
 be/src/runtime/exec_env_init.cpp                   |   8 +
 be/src/runtime/fragment_mgr.cpp                    |   1 -
 be/src/runtime/plan_fragment_executor.cpp          |  59 +-----
 be/src/runtime/plan_fragment_executor.h            |  18 +-
 be/src/runtime/query_context.cpp                   |  13 ++
 be/src/runtime/query_context.h                     |   4 +
 be/src/runtime/query_statistics.cpp                |  10 +-
 be/src/runtime/query_statistics.h                  |  23 +--
 be/src/runtime/runtime_query_statistics_mgr.cpp    | 148 ++++++++++++++
 be/src/runtime/runtime_query_statistics_mgr.h      |  58 ++++++
 be/src/vec/exec/scan/new_olap_scan_node.cpp        |  15 --
 be/src/vec/exec/scan/new_olap_scan_node.h          |   2 -
 be/src/vec/exec/scan/vscan_node.cpp                |   3 +
 be/src/vec/exec/scan/vscanner.cpp                  |   7 +
 be/src/vec/exec/scan/vscanner.h                    |   7 +
 be/src/vec/exec/vexchange_node.cpp                 |  17 +-
 be/src/vec/exec/vexchange_node.h                   |   4 -
 be/src/vec/runtime/vdata_stream_mgr.cpp            |  15 +-
 be/src/vec/runtime/vdata_stream_mgr.h              |  11 +-
 be/src/vec/runtime/vdata_stream_recvr.cpp          |  21 +-
 be/src/vec/runtime/vdata_stream_recvr.h            |  13 +-
 be/src/vec/sink/vdata_stream_sender.cpp            |  39 +---
 be/src/vec/sink/vdata_stream_sender.h              |  22 +-
 be/src/vec/sink/vresult_file_sink.cpp              |  15 +-
 be/src/vec/sink/vresult_file_sink.h                |   4 -
 be/src/vec/sink/vresult_sink.cpp                   |   8 +-
 be/src/vec/sink/vresult_sink.h                     |   3 -
 be/test/vec/runtime/vdata_stream_test.cpp          |   7 +-
 .../main/java/org/apache/doris/common/Config.java  |  10 +
 .../main/java/org/apache/doris/catalog/Env.java    |  10 +
 .../java/org/apache/doris/plugin/AuditEvent.java   |   2 +
 .../java/org/apache/doris/qe/AuditLogHelper.java   |   2 +-
 .../java/org/apache/doris/qe/ConnectProcessor.java |   3 +-
 .../java/org/apache/doris/qe/QeProcessorImpl.java  |   9 +
 .../WorkloadRuntimeStatusMgr.java                  | 223 +++++++++++++++++++++
 gensrc/thrift/FrontendService.thrift               |   7 +
 68 files changed, 649 insertions(+), 464 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 9f468e3edc2..e003a07f613 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1161,6 +1161,8 @@ DEFINE_mInt64(enable_debug_log_timeout_secs, "0");
 // Tolerance for the number of partition id 0 in rowset, default 0
 DEFINE_Int32(ignore_invalid_partition_id_rowset_num, "0");
 
+DEFINE_mInt32(report_query_statistics_interval_ms, "3000");
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 97f4dd5535d..28460298788 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1237,6 +1237,8 @@ DECLARE_mBool(enable_column_type_check);
 // Tolerance for the number of partition id 0 in rowset, default 0
 DECLARE_Int32(ignore_invalid_partition_id_rowset_num);
 
+DECLARE_mInt32(report_query_statistics_interval_ms);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index e3bf1a738b8..5274808c9b8 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -45,9 +45,12 @@
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
 #include "runtime/block_spill_manager.h"
+#include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/runtime_query_statistics_mgr.h"
 #include "runtime/task_group/task_group_manager.h"
 #include "util/cpu_info.h"
 #include "util/debug_util.h"
@@ -352,6 +355,13 @@ void Daemon::block_spill_gc_thread() {
     }
 }
 
+void Daemon::report_runtime_query_statistics_thread() {
+    while (!_stop_background_threads_latch.wait_for(
+            
std::chrono::milliseconds(config::report_query_statistics_interval_ms))) {
+        
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->report_runtime_query_statistics();
+    }
+}
+
 void Daemon::je_purge_dirty_pages_thread() const {
     do {
         std::unique_lock<std::mutex> 
l(doris::MemInfo::je_purge_dirty_pages_lock);
@@ -399,6 +409,9 @@ void Daemon::start() {
     st = Thread::create(
             "Daemon", "je_purge_dirty_pages_thread",
             [this]() { this->je_purge_dirty_pages_thread(); }, 
&_threads.emplace_back());
+    st = Thread::create(
+            "Daemon", "query_runtime_statistics_thread",
+            [this]() { this->report_runtime_query_statistics_thread(); }, 
&_threads.emplace_back());
     CHECK(st.ok()) << st;
 }
 
diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h
index 18f78cbe583..e88dd737643 100644
--- a/be/src/common/daemon.h
+++ b/be/src/common/daemon.h
@@ -44,6 +44,7 @@ private:
     void calculate_metrics_thread();
     void block_spill_gc_thread();
     void je_purge_dirty_pages_thread() const;
+    void report_runtime_query_statistics_thread();
 
     CountDownLatch _stop_background_threads_latch;
     std::vector<scoped_refptr<Thread>> _threads;
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index 2742ccd163e..c58bbdb2523 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -54,14 +54,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         if (!thrift_sink.__isset.stream_sink) {
             return Status::InternalError("Missing data stream sink.");
         }
-        bool send_query_statistics_with_every_batch =
-                params.__isset.send_query_statistics_with_every_batch
-                        ? params.send_query_statistics_with_every_batch
-                        : false;
         // TODO: figure out good buffer size based on size of output row
         sink->reset(new vectorized::VDataStreamSender(state, pool, 
params.sender_id, row_desc,
-                                                      thrift_sink.stream_sink, 
params.destinations,
-                                                      
send_query_statistics_with_every_batch));
+                                                      thrift_sink.stream_sink,
+                                                      params.destinations));
         // RETURN_IF_ERROR(sender->prepare(state->obj_pool(), 
thrift_sink.stream_sink));
         break;
     }
@@ -82,16 +78,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         }
 
         // TODO: figure out good buffer size based on size of output row
-        bool send_query_statistics_with_every_batch =
-                params.__isset.send_query_statistics_with_every_batch
-                        ? params.send_query_statistics_with_every_batch
-                        : false;
         // Result file sink is not the top sink
         if (params.__isset.destinations && params.destinations.size() > 0) {
             sink->reset(new doris::vectorized::VResultFileSink(
                     state, pool, params.sender_id, row_desc, 
thrift_sink.result_file_sink,
-                    params.destinations, 
send_query_statistics_with_every_batch, output_exprs,
-                    desc_tbl));
+                    params.destinations, output_exprs, desc_tbl));
         } else {
             sink->reset(new doris::vectorized::VResultFileSink(row_desc, 
output_exprs));
         }
@@ -201,14 +192,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         if (!thrift_sink.__isset.stream_sink) {
             return Status::InternalError("Missing data stream sink.");
         }
-        bool send_query_statistics_with_every_batch =
-                params.__isset.send_query_statistics_with_every_batch
-                        ? params.send_query_statistics_with_every_batch
-                        : false;
         // TODO: figure out good buffer size based on size of output row
-        *sink = std::make_unique<vectorized::VDataStreamSender>(
-                state, pool, local_params.sender_id, row_desc, 
thrift_sink.stream_sink,
-                params.destinations, send_query_statistics_with_every_batch);
+        *sink = std::make_unique<vectorized::VDataStreamSender>(state, pool, 
local_params.sender_id,
+                                                                row_desc, 
thrift_sink.stream_sink,
+                                                                
params.destinations);
         // RETURN_IF_ERROR(sender->prepare(state->obj_pool(), 
thrift_sink.stream_sink));
         break;
     }
@@ -229,16 +216,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         }
 
         // TODO: figure out good buffer size based on size of output row
-        bool send_query_statistics_with_every_batch =
-                params.__isset.send_query_statistics_with_every_batch
-                        ? params.send_query_statistics_with_every_batch
-                        : false;
         // Result file sink is not the top sink
         if (params.__isset.destinations && params.destinations.size() > 0) {
             sink->reset(new doris::vectorized::VResultFileSink(
                     state, pool, local_params.sender_id, row_desc, 
thrift_sink.result_file_sink,
-                    params.destinations, 
send_query_statistics_with_every_batch, output_exprs,
-                    desc_tbl));
+                    params.destinations, output_exprs, desc_tbl));
         } else {
             sink->reset(new doris::vectorized::VResultFileSink(row_desc, 
output_exprs));
         }
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index 6faef8f8a78..3bf72ae5450 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -36,7 +36,6 @@ class ObjectPool;
 class RuntimeState;
 class TPlanFragmentExecParams;
 class DescriptorTbl;
-class QueryStatistics;
 class TDataSink;
 class TExpr;
 class TPipelineFragmentParams;
@@ -104,10 +103,6 @@ public:
     // Returns the runtime profile for the sink.
     RuntimeProfile* profile() { return _profile; }
 
-    virtual void set_query_statistics(std::shared_ptr<QueryStatistics> 
statistics) {
-        _query_statistics = statistics;
-    }
-
     const RowDescriptor& row_desc() { return _row_desc; }
 
     virtual bool can_write() { return true; }
@@ -124,9 +119,6 @@ protected:
 
     RuntimeProfile* _profile = nullptr; // Allocated from _pool
 
-    // Maybe this will be transferred to BufferControlBlock.
-    std::shared_ptr<QueryStatistics> _query_statistics;
-
     RuntimeProfile::Counter* _exec_timer = nullptr;
     RuntimeProfile::Counter* _blocks_sent_counter = nullptr;
     RuntimeProfile::Counter* _output_rows_counter = nullptr;
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 7d38ee5e651..6bc6e07c563 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -74,7 +74,6 @@
 #include "vec/utils/util.hpp"
 
 namespace doris {
-class QueryStatistics;
 
 const std::string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsProducedRate";
 
@@ -96,6 +95,7 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, 
const DescriptorTbl
     if (tnode.__isset.output_tuple_id) {
         _output_row_descriptor.reset(new RowDescriptor(descs, 
{tnode.output_tuple_id}, {true}));
     }
+    _query_statistics = std::make_shared<QueryStatistics>();
 }
 
 ExecNode::~ExecNode() = default;
@@ -176,22 +176,6 @@ Status ExecNode::reset(RuntimeState* state) {
     return Status::OK();
 }
 
-Status ExecNode::collect_query_statistics(QueryStatistics* statistics) {
-    DCHECK(statistics != nullptr);
-    for (auto child_node : _children) {
-        RETURN_IF_ERROR(child_node->collect_query_statistics(statistics));
-    }
-    return Status::OK();
-}
-
-Status ExecNode::collect_query_statistics(QueryStatistics* statistics, int 
sender_id) {
-    DCHECK(statistics != nullptr);
-    for (auto child_node : _children) {
-        RETURN_IF_ERROR(child_node->collect_query_statistics(statistics, 
sender_id));
-    }
-    return Status::OK();
-}
-
 void ExecNode::release_resource(doris::RuntimeState* state) {
     if (!_is_resource_released) {
         if (_rows_returned_counter != nullptr) {
@@ -276,6 +260,9 @@ Status ExecNode::create_tree_helper(RuntimeState* state, 
ObjectPool* pool,
     // Step 1 Create current ExecNode according to current thrift plan node.
     ExecNode* cur_exec_node = nullptr;
     RETURN_IF_ERROR(create_node(state, pool, cur_plan_node, descs, 
&cur_exec_node));
+    if (cur_exec_node != nullptr) {
+        
state->get_query_ctx()->register_query_statistics(cur_exec_node->get_query_statistics());
+    }
 
     // Step 1.1
     // Record current node if we have parent or record myself as root node.
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index aaa2f6ee07f..f4b49cba6f5 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -156,13 +156,6 @@ public:
     // so should be fast.
     [[nodiscard]] virtual Status reset(RuntimeState* state);
 
-    // This should be called before close() and after get_next(), it is 
responsible for
-    // collecting statistics sent with row batch, it can't be called when 
prepare() returns
-    // error.
-    [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* 
statistics);
-
-    [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* 
statistics,
-                                                          int sender_id);
     // close() will get called for every exec node, regardless of what else is 
called and
     // the status of these calls (i.e. prepare() may never have been called, or
     // prepare()/open()/get_next() returned with an error).
@@ -243,6 +236,8 @@ public:
     // such as send the last buffer to remote.
     virtual Status try_close(RuntimeState* state) { return Status::OK(); }
 
+    std::shared_ptr<QueryStatistics> get_query_statistics() { return 
_query_statistics; }
+
 protected:
     friend class DataSink;
 
@@ -330,6 +325,8 @@ protected:
 
     std::atomic<bool> _can_read = false;
 
+    std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
+
 private:
     static Status create_tree_helper(RuntimeState* state, ObjectPool* pool,
                                      const std::vector<TPlanNode>& tnodes,
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 1f579b6a971..be5d3e64480 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -255,10 +255,6 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
         auto& brpc_request = _instance_to_request[id];
         brpc_request->set_eos(request.eos);
         brpc_request->set_packet_seq(_instance_to_seq[id]++);
-        if (_statistics && _statistics->collected()) {
-            auto statistic = brpc_request->mutable_query_statistics();
-            _statistics->to_pb(statistic);
-        }
         if (request.block) {
             brpc_request->set_allocated_block(request.block.get());
         }
@@ -325,10 +321,6 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
         if (request.block_holder->get_block()) {
             
brpc_request->set_allocated_block(request.block_holder->get_block());
         }
-        if (_statistics && _statistics->collected()) {
-            auto statistic = brpc_request->mutable_query_statistics();
-            _statistics->to_pb(statistic);
-        }
         auto send_callback = request.channel->get_send_callback(id, 
request.eos);
 
         ExchangeRpcContext rpc_ctx;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 83b20f9c8a8..a11b637f4d4 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -215,7 +215,6 @@ public:
         _queue_dependency = queue_dependency;
         _finish_dependency = finish_dependency;
     }
-    void set_query_statistics(QueryStatistics* statistics) { _statistics = 
statistics; }
 
     void set_should_stop() {
         _should_stop = true;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 43bec0bd92d..aa29bb4cf0f 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -67,7 +67,6 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
     _sink_buffer = 
std::make_unique<ExchangeSinkBuffer<vectorized::VDataStreamSender>>(
             id, _dest_node_id, _sink->_sender_id, _state->be_number(), 
state->get_query_ctx());
 
-    _sink_buffer->set_query_statistics(_sink->query_statistics());
     RETURN_IF_ERROR(DataSinkOperator::prepare(state));
     _sink->register_pipeline_channels(_sink_buffer.get());
     return Status::OK();
@@ -135,14 +134,12 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
 
     std::map<int64_t, int64_t> fragment_id_to_channel_index;
     for (int i = 0; i < p._dests.size(); ++i) {
-        // Select first dest as transfer chain.
-        bool is_transfer_chain = (i == 0);
         const auto& fragment_instance_id = p._dests[i].fragment_instance_id;
         if (fragment_id_to_channel_index.find(fragment_instance_id.lo) ==
             fragment_id_to_channel_index.end()) {
             channel_shared_ptrs.emplace_back(new 
vectorized::PipChannel<ExchangeSinkLocalState>(
                     this, p._row_desc, p._dests[i].brpc_server, 
fragment_instance_id,
-                    p._dest_node_id, is_transfer_chain, 
p._send_query_statistics_with_every_batch));
+                    p._dest_node_id));
             fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
                                                  channel_shared_ptrs.size() - 
1);
             channels.push_back(channel_shared_ptrs.back().get());
@@ -258,14 +255,12 @@ segment_v2::CompressionTypePB 
ExchangeSinkLocalState::compression_type() const {
 
 ExchangeSinkOperatorX::ExchangeSinkOperatorX(
         RuntimeState* state, const RowDescriptor& row_desc, int operator_id,
-        const TDataStreamSink& sink, const 
std::vector<TPlanFragmentDestination>& destinations,
-        bool send_query_statistics_with_every_batch)
+        const TDataStreamSink& sink, const 
std::vector<TPlanFragmentDestination>& destinations)
         : DataSinkOperatorX(operator_id, sink.dest_node_id),
           _texprs(sink.output_partition.partition_exprs),
           _row_desc(row_desc),
           _part_type(sink.output_partition.type),
           _dests(destinations),
-          
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
           _dest_node_id(sink.dest_node_id),
           _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) {
     DCHECK_GT(destinations.size(), 0);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 24fe9e1d84c..6d1d1b6a4fe 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -203,8 +203,7 @@ class ExchangeSinkOperatorX final : public 
DataSinkOperatorX<ExchangeSinkLocalSt
 public:
     ExchangeSinkOperatorX(RuntimeState* state, const RowDescriptor& row_desc, 
int operator_id,
                           const TDataStreamSink& sink,
-                          const std::vector<TPlanFragmentDestination>& 
destinations,
-                          bool send_query_statistics_with_every_batch);
+                          const std::vector<TPlanFragmentDestination>& 
destinations);
     Status init(const TDataSink& tsink) override;
 
     RuntimeState* state() { return _state; }
@@ -244,7 +243,6 @@ private:
     PBlock _pb_block2;
 
     const std::vector<TPlanFragmentDestination> _dests;
-    const bool _send_query_statistics_with_every_batch;
 
     std::unique_ptr<MemTracker> _mem_tracker;
     // Identifier of the destination plan node.
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 255cb151410..73aa7a22e81 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -73,7 +73,7 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     auto& p = _parent->cast<ExchangeSourceOperatorX>();
     stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
             state, p.input_row_desc(), state->fragment_instance_id(), 
p.node_id(), p.num_senders(),
-            profile(), p.is_merging(), p.sub_plan_query_statistics_recvr());
+            profile(), p.is_merging());
     auto* source_dependency = _dependency;
     const auto& queues = stream_recvr->sender_queues();
     deps.resize(queues.size());
@@ -133,7 +133,6 @@ Status ExchangeSourceOperatorX::init(const TPlanNode& 
tnode, RuntimeState* state
 Status ExchangeSourceOperatorX::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::prepare(state));
     DCHECK_GT(_num_senders, 0);
-    _sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr());
 
     if (_is_merging) {
         RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _row_descriptor, 
_row_descriptor));
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index bbccfe52987..3340691f7ee 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -113,10 +113,6 @@ public:
     [[nodiscard]] int num_senders() const { return _num_senders; }
     [[nodiscard]] bool is_merging() const { return _is_merging; }
 
-    std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr() {
-        return _sub_plan_query_statistics_recvr;
-    }
-
     DataDistribution required_data_distribution() const override {
         if (OperatorX<ExchangeLocalState>::ignore_data_distribution()) {
             return {ExchangeType::NOOP};
@@ -134,7 +130,6 @@ private:
     const bool _is_merging;
     const TPartitionType::type _partition_type;
     RowDescriptor _input_row_desc;
-    std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
 
     // use in merge sort
     size_t _offset;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index bb0b8f30911..80184374b77 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -174,14 +174,6 @@ public:
 
     virtual bool is_source() const;
 
-    virtual Status collect_query_statistics(QueryStatistics* statistics) { 
return Status::OK(); };
-
-    virtual Status collect_query_statistics(QueryStatistics* statistics, int 
sender_id) {
-        return Status::OK();
-    };
-
-    virtual void set_query_statistics(std::shared_ptr<QueryStatistics>) {};
-
     virtual Status init(const TDataSink& tsink) { return Status::OK(); }
 
     // Prepare for running. (e.g. resource allocation, etc.)
@@ -317,9 +309,6 @@ public:
     }
 
     [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { 
return _sink->profile(); }
-    void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) 
override {
-        _sink->set_query_statistics(statistics);
-    }
 
 protected:
     DataSinkType* _sink = nullptr;
@@ -385,16 +374,6 @@ public:
         return _node->runtime_profile();
     }
 
-    Status collect_query_statistics(QueryStatistics* statistics) override {
-        RETURN_IF_ERROR(_node->collect_query_statistics(statistics));
-        return Status::OK();
-    }
-
-    Status collect_query_statistics(QueryStatistics* statistics, int 
sender_id) override {
-        RETURN_IF_ERROR(_node->collect_query_statistics(statistics, 
sender_id));
-        return Status::OK();
-    }
-
 protected:
     StreamingNodeType* _node = nullptr;
     bool _use_projection;
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 11045c5f069..2b095748b15 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -59,13 +59,11 @@ ResultFileSinkOperatorX::ResultFileSinkOperatorX(int 
operator_id, const RowDescr
 ResultFileSinkOperatorX::ResultFileSinkOperatorX(
         int operator_id, const RowDescriptor& row_desc, const TResultFileSink& 
sink,
         const std::vector<TPlanFragmentDestination>& destinations,
-        bool send_query_statistics_with_every_batch, const std::vector<TExpr>& 
t_output_expr,
-        DescriptorTbl& descs)
+        const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs)
         : DataSinkOperatorX(operator_id, 0),
           _row_desc(row_desc),
           _t_output_expr(t_output_expr),
           _dests(destinations),
-          
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
           
_output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false),
           _is_top_sink(false) {
     CHECK_EQ(destinations.size(), 1);
@@ -134,10 +132,9 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& i
 
         std::map<int64_t, int64_t> fragment_id_to_channel_index;
         for (int i = 0; i < p._dests.size(); ++i) {
-            _channels.push_back(new vectorized::Channel(
-                    this, p._row_desc, p._dests[i].brpc_server, 
state->fragment_instance_id(),
-                    info.tsink.result_file_sink.dest_node_id, false,
-                    p._send_query_statistics_with_every_batch));
+            _channels.push_back(new vectorized::Channel(this, p._row_desc, 
p._dests[i].brpc_server,
+                                                        
state->fragment_instance_id(),
+                                                        
info.tsink.result_file_sink.dest_node_id));
         }
         std::random_device rd;
         std::mt19937 g(rd());
@@ -187,7 +184,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, 
Status exec_status)
     if (p._is_top_sink) {
         // close sender, this is normal path end
         if (_sender) {
-            _sender->update_num_written_rows(_writer == nullptr ? 0 : 
_writer->get_written_rows());
+            _sender->update_return_rows(_writer == nullptr ? 0 : 
_writer->get_written_rows());
             static_cast<void>(_sender->close(final_status));
         }
         static_cast<void>(state->exec_env()->result_mgr()->cancel_at_time(
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h 
b/be/src/pipeline/exec/result_file_sink_operator.h
index f064b8f2b7f..57e1e8c9147 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -94,7 +94,6 @@ public:
     ResultFileSinkOperatorX(int operator_id, const RowDescriptor& row_desc,
                             const TResultFileSink& sink,
                             const std::vector<TPlanFragmentDestination>& 
destinations,
-                            bool send_query_statistics_with_every_batch,
                             const std::vector<TExpr>& t_output_expr, 
DescriptorTbl& descs);
     Status init(const TDataSink& thrift_sink) override;
 
@@ -113,7 +112,6 @@ private:
     const std::vector<TExpr>& _t_output_expr;
 
     const std::vector<TPlanFragmentDestination> _dests;
-    bool _send_query_statistics_with_every_batch;
 
     // set file options when sink type is FILE
     std::unique_ptr<vectorized::ResultFileOptions> _file_opts;
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index 8dc6eed2998..8b3afb1908f 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -180,9 +180,8 @@ Status ResultSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
     // close sender, this is normal path end
     if (_sender) {
         if (_writer) {
-            _sender->update_num_written_rows(_writer->get_written_rows());
+            _sender->update_return_rows(_writer->get_written_rows());
         }
-        _sender->update_max_peak_memory_bytes();
         static_cast<void>(_sender->close(final_status));
     }
     static_cast<void>(state->exec_env()->result_mgr()->cancel_at_time(
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 851e0ad3250..7001e2b828a 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1213,6 +1213,9 @@ Status ScanLocalState<Derived>::_prepare_scanners() {
         _eos = true;
         _scan_dependency->set_ready();
     } else {
+        for (auto& scanner : scanners) {
+            scanner->set_query_statistics(_query_statistics.get());
+        }
         COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
         RETURN_IF_ERROR(_start_scanners(_scanners));
     }
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index ab6850b704b..148191f4e2d 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -115,12 +115,6 @@ public:
     [[nodiscard]] PipelineId id() const { return _pipeline_id; }
     void set_is_root_pipeline() { _is_root_pipeline = true; }
     bool is_root_pipeline() const { return _is_root_pipeline; }
-    void set_collect_query_statistics_with_every_batch() {
-        _collect_query_statistics_with_every_batch = true;
-    }
-    [[nodiscard]] bool collect_query_statistics_with_every_batch() const {
-        return _collect_query_statistics_with_every_batch;
-    }
 
     static bool is_hash_exchange(ExchangeType idx) {
         return idx == ExchangeType::HASH_SHUFFLE || idx == 
ExchangeType::BUCKET_HASH_SHUFFLE;
@@ -235,7 +229,6 @@ private:
     bool _always_can_read = false;
     bool _always_can_write = false;
     bool _is_root_pipeline = false;
-    bool _collect_query_statistics_with_every_batch = false;
 
     // Input data distribution of this pipeline. We do local exchange when 
input data distribution
     // does not match the target data distribution.
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 695fd6f4d3d..b9c2382ce86 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -342,7 +342,6 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
 
     _root_pipeline = fragment_context->add_pipeline();
     _root_pipeline->set_is_root_pipeline();
-    _root_pipeline->set_collect_query_statistics_with_every_batch();
     RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline));
     if (_sink) {
         // DataSinkOperator is builded here
@@ -854,7 +853,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, 
const TDataSink& thr
             _multi_cast_stream_sink_senders[i].reset(new 
vectorized::VDataStreamSender(
                     _runtime_state.get(), _runtime_state->obj_pool(), 
sender_id, row_desc,
                     thrift_sink.multi_cast_stream_sink.sinks[i],
-                    thrift_sink.multi_cast_stream_sink.destinations[i], 
false));
+                    thrift_sink.multi_cast_stream_sink.destinations[i]));
 
             // 2. create and set the source operator of 
multi_cast_data_stream_source for new pipeline
             OperatorBuilderPtr source_op =
@@ -945,7 +944,7 @@ Status PipelineFragmentContext::send_report(bool done) {
              std::bind(&PipelineFragmentContext::update_status, this, 
std::placeholders::_1),
              std::bind(&PipelineFragmentContext::cancel, this, 
std::placeholders::_1,
                        std::placeholders::_2),
-             _dml_query_statistics()},
+             _query_ctx->get_query_statistics()},
             
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
 }
 
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 2a3a11d59cc..353e7a06586 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -150,10 +150,6 @@ public:
 
     uint64_t create_time() const { return _create_time; }
 
-    void set_query_statistics(std::shared_ptr<QueryStatistics> 
query_statistics) {
-        _query_statistics = query_statistics;
-    }
-
 protected:
     Status _create_sink(int sender_id, const TDataSink& t_data_sink, 
RuntimeState* state);
     Status _build_pipelines(ExecNode*, PipelinePtr);
@@ -230,17 +226,9 @@ protected:
 
 private:
     static bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
-    std::shared_ptr<QueryStatistics> _dml_query_statistics() {
-        if (_query_statistics && _query_statistics->collect_dml_statistics()) {
-            return _query_statistics;
-        }
-        return nullptr;
-    }
     std::vector<std::unique_ptr<PipelineTask>> _tasks;
 
     uint64_t _create_time;
-
-    std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
 };
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 5f5ff56aa4b..32f57c29986 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -58,11 +58,6 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t 
index, RuntimeState*
           _root(_operators.back()),
           _sink(sink) {
     _pipeline_task_watcher.start();
-    _query_statistics.reset(new 
QueryStatistics(state->query_options().query_type));
-    _sink->set_query_statistics(_query_statistics);
-    _collect_query_statistics_with_every_batch =
-            _pipeline->collect_query_statistics_with_every_batch();
-    fragment_context->set_query_statistics(_query_statistics);
 }
 
 PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index, 
RuntimeState* state,
@@ -285,10 +280,6 @@ Status PipelineTask::execute(bool* eos) {
 
         if (_block->rows() != 0 || *eos) {
             SCOPED_TIMER(_sink_timer);
-            if (_data_state == SourceState::FINISHED ||
-                _collect_query_statistics_with_every_batch) {
-                RETURN_IF_ERROR(_collect_query_statistics());
-            }
             status = _sink->sink(_state, block, _data_state);
             if (!status.is<ErrorCode::END_OF_FILE>()) {
                 RETURN_IF_ERROR(status);
@@ -303,23 +294,6 @@ Status PipelineTask::execute(bool* eos) {
     return Status::OK();
 }
 
-Status PipelineTask::_collect_query_statistics() {
-    // The execnode tree of a fragment will be split into multiple pipelines, 
we only need to collect the root pipeline.
-    if (_pipeline->is_root_pipeline()) {
-        // If the current fragment has only one instance, we can collect all 
of them;
-        // otherwise, we need to collect them based on the sender_id.
-        if (_state->num_per_fragment_instances() == 1) {
-            _query_statistics->clear();
-            
RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get()));
-        } else {
-            _query_statistics->clear();
-            
RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get(),
-                                                            
_state->per_fragment_instance_idx()));
-        }
-    }
-    return Status::OK();
-}
-
 Status PipelineTask::try_close(Status exec_status) {
     if (_try_close_flag) {
         return Status::OK();
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 85951bdb06a..56e42370ff2 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -389,9 +389,6 @@ protected:
     int64_t _close_pipeline_time = 0;
 
     RuntimeProfile::Counter* _pip_task_total_timer = nullptr;
-    std::shared_ptr<QueryStatistics> _query_statistics;
-    Status _collect_query_statistics();
-    bool _collect_query_statistics_with_every_batch = false;
 
 private:
     Operators _operators; // left is _source, right is _root
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index bc16d70b86c..e38e7b39d95 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -316,7 +316,9 @@ 
PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXB
           _rows_returned_counter(nullptr),
           _peak_memory_usage_counter(nullptr),
           _parent(parent),
-          _state(state) {}
+          _state(state) {
+    _query_statistics = std::make_shared<QueryStatistics>();
+}
 
 template <typename DependencyType>
 Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, 
LocalStateInfo& info) {
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index ec2500acf33..5304d0074f6 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -107,6 +107,8 @@ public:
     //  override in Scan  MultiCastSink
     virtual RuntimeFilterDependency* filterdependency() { return nullptr; }
 
+    std::shared_ptr<QueryStatistics> query_statistics_ptr() { return 
_query_statistics; }
+
 protected:
     friend class OperatorXBase;
 
@@ -119,6 +121,8 @@ protected:
     // which will providea reference for operator memory.
     std::unique_ptr<MemTracker> _mem_tracker;
 
+    std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
+
     RuntimeProfile::Counter* _rows_returned_counter = nullptr;
     RuntimeProfile::Counter* _blocks_returned_counter = nullptr;
     RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
@@ -401,7 +405,6 @@ public:
     RuntimeState* state() { return _state; }
     RuntimeProfile* profile() { return _profile; }
     MemTracker* mem_tracker() { return _mem_tracker.get(); }
-    QueryStatistics* query_statistics() { return _query_statistics.get(); }
     [[nodiscard]] RuntimeProfile* faker_runtime_profile() const {
         return _faker_runtime_profile.get();
     }
@@ -418,8 +421,6 @@ protected:
     RuntimeState* _state = nullptr;
     RuntimeProfile* _profile = nullptr;
     std::unique_ptr<MemTracker> _mem_tracker;
-    // Maybe this will be transferred to BufferControlBlock.
-    std::shared_ptr<QueryStatistics> _query_statistics;
     // Set to true after close() has been called. subclasses should check and 
set this in
     // close().
     bool _closed = false;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 6b1d9af512b..4cc00312697 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -327,13 +327,8 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
         if (!thrift_sink.__isset.stream_sink) {
             return Status::InternalError("Missing data stream sink.");
         }
-        bool send_query_statistics_with_every_batch =
-                params.__isset.send_query_statistics_with_every_batch
-                        ? params.send_query_statistics_with_every_batch
-                        : false;
         _sink.reset(new ExchangeSinkOperatorX(state, row_desc, 
next_sink_operator_id(),
-                                              thrift_sink.stream_sink, 
params.destinations,
-                                              
send_query_statistics_with_every_batch));
+                                              thrift_sink.stream_sink, 
params.destinations));
         break;
     }
     case TDataSinkType::RESULT_SINK: {
@@ -377,16 +372,11 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
         }
 
         // TODO: figure out good buffer size based on size of output row
-        bool send_query_statistics_with_every_batch =
-                params.__isset.send_query_statistics_with_every_batch
-                        ? params.send_query_statistics_with_every_batch
-                        : false;
         // Result file sink is not the top sink
         if (params.__isset.destinations && params.destinations.size() > 0) {
-            _sink.reset(new ResultFileSinkOperatorX(
-                    next_sink_operator_id(), row_desc, 
thrift_sink.result_file_sink,
-                    params.destinations, 
send_query_statistics_with_every_batch, output_exprs,
-                    desc_tbl));
+            _sink.reset(new ResultFileSinkOperatorX(next_sink_operator_id(), 
row_desc,
+                                                    
thrift_sink.result_file_sink,
+                                                    params.destinations, 
output_exprs, desc_tbl));
         } else {
             _sink.reset(
                     new ResultFileSinkOperatorX(next_sink_operator_id(), 
row_desc, output_exprs));
@@ -431,10 +421,10 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
             // 2. create and set sink operator of data stream sender for new 
pipeline
 
             DataSinkOperatorXPtr sink_op;
-            sink_op.reset(new ExchangeSinkOperatorX(
-                    state, *_row_desc, next_sink_operator_id(),
-                    thrift_sink.multi_cast_stream_sink.sinks[i],
-                    thrift_sink.multi_cast_stream_sink.destinations[i], 
false));
+            sink_op.reset(
+                    new ExchangeSinkOperatorX(state, *_row_desc, 
next_sink_operator_id(),
+                                              
thrift_sink.multi_cast_stream_sink.sinks[i],
+                                              
thrift_sink.multi_cast_stream_sink.destinations[i]));
 
             static_cast<void>(new_pipeline->set_sink(sink_op));
             {
@@ -605,7 +595,8 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
 
         auto prepare_and_set_parent_profile = [&](PipelineXTask* task, size_t 
pip_idx) {
             DCHECK(pipeline_id_to_profile[pip_idx]);
-            RETURN_IF_ERROR(task->prepare(local_params, 
request.fragment.output_sink));
+            RETURN_IF_ERROR(
+                    task->prepare(local_params, request.fragment.output_sink, 
_query_ctx.get()));
             return Status::OK();
         };
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 4c0c0af1cd4..29bc70fce67 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -67,7 +67,8 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t 
task_id, RuntimeSta
     pipeline->incr_created_tasks();
 }
 
-Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, 
const TDataSink& tsink) {
+Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, 
const TDataSink& tsink,
+                              QueryContext* query_ctx) {
     DCHECK(_sink);
     DCHECK(_cur_state == PipelineTaskState::NOT_READY) << 
get_state_name(_cur_state);
     _init_profile();
@@ -97,6 +98,8 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams& 
local_params, const
                              _le_state_map,  _task_idx,   
_source_dependency[op->operator_id()]};
         RETURN_IF_ERROR(op->setup_local_state(_state, info));
         parent_profile = _state->get_local_state(op->operator_id())->profile();
+        query_ctx->register_query_statistics(
+                
_state->get_local_state(op->operator_id())->query_statistics_ptr());
     }
 
     _block = doris::vectorized::Block::create_unique();
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index f7b996f40a7..c9e17727c70 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -62,7 +62,8 @@ public:
         return Status::InternalError("Should not reach here!");
     }
 
-    Status prepare(const TPipelineInstanceParams& local_params, const 
TDataSink& tsink);
+    Status prepare(const TPipelineInstanceParams& local_params, const 
TDataSink& tsink,
+                   QueryContext* query_ctx);
 
     Status execute(bool* eos) override;
 
diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index 91cb032c765..d746a6aca10 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -91,7 +91,9 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, 
int buffer_size)
           _is_cancelled(false),
           _buffer_rows(0),
           _buffer_limit(buffer_size),
-          _packet_num(0) {}
+          _packet_num(0) {
+    _query_statistics = std::make_unique<QueryStatistics>();
+}
 
 BufferControlBlock::~BufferControlBlock() {
     static_cast<void>(cancel());
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index f75008f1016..d84bcba6818 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -91,11 +91,7 @@ public:
 
     [[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; }
 
-    void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) {
-        _query_statistics = statistics;
-    }
-
-    void update_num_written_rows(int64_t num_rows) {
+    void update_return_rows(int64_t num_rows) {
         // _query_statistics may be null when the result sink init failed
         // or some other failure.
         // and the number of written rows is only needed when all things go 
well.
@@ -104,13 +100,6 @@ public:
         }
     }
 
-    void update_max_peak_memory_bytes() {
-        if (_query_statistics != nullptr) {
-            int64_t max_peak_memory_bytes = 
_query_statistics->calculate_max_peak_memory_bytes();
-            
_query_statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
-        }
-    }
-
 protected:
     virtual bool _get_batch_queue_empty() {
         return _fe_result_batch_queue.empty() && 
_arrow_flight_batch_queue.empty();
@@ -142,10 +131,8 @@ protected:
 
     std::deque<GetResultBatchCtx*> _waiting_rpc;
 
-    // It is shared with PlanFragmentExecutor and will be called in two 
different
-    // threads. But their calls are all at different time, there is no problem 
of
-    // multithreading access.
-    std::shared_ptr<QueryStatistics> _query_statistics;
+    // only used for FE using return rows to check limit
+    std::unique_ptr<QueryStatistics> _query_statistics;
 };
 
 class PipBufferControlBlock : public BufferControlBlock {
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 5ac1bffdad3..edc765ce879 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -74,6 +74,7 @@ class MemTracker;
 class StorageEngine;
 class ResultBufferMgr;
 class ResultQueueMgr;
+class RuntimeQueryStatiticsMgr;
 class TMasterInfo;
 class LoadChannelMgr;
 class LoadStreamMgr;
@@ -153,6 +154,9 @@ public:
     pipeline::TaskScheduler* pipeline_task_group_scheduler() { return 
_with_group_task_scheduler; }
     taskgroup::TaskGroupManager* task_group_manager() { return 
_task_group_manager; }
     WorkloadSchedPolicyMgr* workload_sched_policy_mgr() { return 
_workload_sched_mgr; }
+    RuntimeQueryStatiticsMgr* runtime_query_statistics_mgr() {
+        return _runtime_query_statistics_mgr;
+    }
 
     // using template to simplify client cache management
     template <typename T>
@@ -381,6 +385,8 @@ private:
     doris::pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = 
nullptr;
 
     WorkloadSchedPolicyMgr* _workload_sched_mgr = nullptr;
+
+    RuntimeQueryStatiticsMgr* _runtime_query_statistics_mgr = nullptr;
 };
 
 template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index dd54e6b10c2..a3269e9d339 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -70,6 +70,7 @@
 #include "runtime/result_buffer_mgr.h"
 #include "runtime/result_queue_mgr.h"
 #include "runtime/routine_load/routine_load_task_executor.h"
+#include "runtime/runtime_query_statistics_mgr.h"
 #include "runtime/small_file_mgr.h"
 #include "runtime/stream_load/new_load_stream_mgr.h"
 #include "runtime/stream_load/stream_load_executor.h"
@@ -196,6 +197,9 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
                               .set_max_queue_size(1000000)
                               .build(&_lazy_release_obj_pool));
 
+    // NOTE: runtime query statistics mgr could be visited by query and daemon 
thread
+    // so it should be created before all query begin and deleted after all 
query and daemon thread stoppped
+    _runtime_query_statistics_mgr = new RuntimeQueryStatiticsMgr();
     init_file_cache_factory();
     RETURN_IF_ERROR(init_pipeline_task_scheduler());
     _task_group_manager = new taskgroup::TaskGroupManager();
@@ -633,6 +637,10 @@ void ExecEnv::destroy() {
     // info is deconstructed then BE process will core at coordinator back 
method in fragment mgr.
     SAFE_DELETE(_master_info);
 
+    // NOTE: runtime query statistics mgr could be visited by query and daemon 
thread
+    // so it should be created before all query begin and deleted after all 
query and daemon thread stoppped
+    SAFE_DELETE(_runtime_query_statistics_mgr);
+
     LOG(INFO) << "Doris exec envorinment is destoried.";
 }
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 8cf11047b62..d18eb2ef002 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -223,7 +223,6 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
     if (req.query_statistics) {
         // use to report 'insert into select'
         TQueryStatistics queryStatistics;
-        DCHECK(req.query_statistics->collect_dml_statistics());
         req.query_statistics->to_thrift(&queryStatistics);
         params.__set_query_statistics(queryStatistics);
     }
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 896178946e3..9c40e2e9f34 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -92,10 +92,11 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* 
exec_env,
           _closed(false),
           _is_report_success(false),
           _is_report_on_cancel(true),
-          _collect_query_statistics_with_every_batch(false),
           _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) {
     _report_thread_future = _report_thread_promise.get_future();
     _start_time = VecDateTimeValue::local_time();
+    _query_statistics = std::make_shared<QueryStatistics>();
+    _query_ctx->register_query_statistics(_query_statistics);
 }
 
 PlanFragmentExecutor::~PlanFragmentExecutor() {
@@ -231,11 +232,6 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request) {
         if (sink_profile != nullptr) {
             profile()->add_child(sink_profile, true, nullptr);
         }
-
-        _collect_query_statistics_with_every_batch =
-                params.__isset.send_query_statistics_with_every_batch
-                        ? params.send_query_statistics_with_every_batch
-                        : false;
     } else {
         // _sink is set to nullptr
         _sink.reset(nullptr);
@@ -254,11 +250,6 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request) {
 
     VLOG_NOTICE << "plan_root=\n" << _plan->debug_string();
     _prepared = true;
-
-    _query_statistics.reset(new 
QueryStatistics(request.query_options.query_type));
-    if (_sink != nullptr) {
-        _sink->set_query_statistics(_query_statistics);
-    }
     return Status::OK();
 }
 
@@ -336,10 +327,7 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
             st = get_vectorized_internal(block.get(), &eos);
             RETURN_IF_ERROR(st);
 
-            // Collect this plan and sub plan statistics, and send to parent 
plan.
-            if (_collect_query_statistics_with_every_batch) {
-                _collect_query_statistics();
-            }
+            _query_statistics->add_cpu_ms(_fragment_cpu_timer->value() / 
NANOS_PER_MILLIS);
 
             if (!eos || block->rows() > 0) {
                 st = _sink->send(runtime_state(), block.get());
@@ -351,7 +339,6 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
         }
     }
     {
-        _collect_query_statistics();
         Status status;
         {
             std::lock_guard<std::mutex> l(_status_lock);
@@ -430,44 +417,6 @@ bool PlanFragmentExecutor::is_timeout(const 
VecDateTimeValue& now) const {
     return false;
 }
 
-void PlanFragmentExecutor::_collect_query_statistics() {
-    _query_statistics->clear();
-    Status status;
-    /// TODO(yxc):
-    // The judgment of enable_local_exchange here is a bug, it should not need 
to be checked. I will fix this later.
-    bool _is_local = false;
-    if (_runtime_state->query_options().__isset.enable_local_exchange) {
-        _is_local = _runtime_state->query_options().enable_local_exchange;
-    }
-
-    if (_is_local) {
-        if (_runtime_state->num_per_fragment_instances() == 1) {
-            status = _plan->collect_query_statistics(_query_statistics.get());
-        } else {
-            status = _plan->collect_query_statistics(_query_statistics.get(),
-                                                     
_runtime_state->per_fragment_instance_idx());
-        }
-    } else {
-        status = _plan->collect_query_statistics(_query_statistics.get());
-    }
-
-    if (!status.ok()) {
-        LOG(INFO) << "collect query statistics failed, st=" << status;
-        return;
-    }
-    _query_statistics->add_cpu_ms(_fragment_cpu_timer->value() / 
NANOS_PER_MILLIS);
-    if (_runtime_state->backend_id() != -1) {
-        _collect_node_statistics();
-    }
-}
-
-void PlanFragmentExecutor::_collect_node_statistics() {
-    DCHECK(_runtime_state->backend_id() != -1);
-    NodeStatistics* node_statistics =
-            
_query_statistics->add_nodes_statistics(_runtime_state->backend_id());
-    
node_statistics->set_peak_memory(_runtime_state->query_mem_tracker()->peak_consumption());
-}
-
 void PlanFragmentExecutor::report_profile() {
     SCOPED_ATTACH_TASK(_runtime_state.get());
     VLOG_FILE << "report_profile(): instance_id=" << 
_runtime_state->fragment_instance_id();
@@ -559,7 +508,7 @@ void PlanFragmentExecutor::send_report(bool done) {
             std::bind(&PlanFragmentExecutor::update_status, this, 
std::placeholders::_1),
             std::bind(&PlanFragmentExecutor::cancel, this, 
std::placeholders::_1,
                       std::placeholders::_2),
-            _dml_query_statistics()};
+            _query_ctx->get_query_statistics()};
     // This will send a report even if we are cancelled.  If the query 
completed correctly
     // but fragments still need to be cancelled (e.g. limit reached), the 
coordinator will
     // be waiting for a final report and profile.
diff --git a/be/src/runtime/plan_fragment_executor.h 
b/be/src/runtime/plan_fragment_executor.h
index 41817e5308d..051448f13fa 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -47,7 +47,6 @@ class DataSink;
 class DescriptorTbl;
 class ExecEnv;
 class ObjectPool;
-class QueryStatistics;
 struct ReportStatusRequest;
 
 namespace vectorized {
@@ -231,12 +230,6 @@ private:
 
     VecDateTimeValue _start_time;
 
-    // It is shared with BufferControlBlock and will be called in two different
-    // threads. But their calls are all at different time, there is no problem 
of
-    // multithreaded access.
-    std::shared_ptr<QueryStatistics> _query_statistics;
-    bool _collect_query_statistics_with_every_batch;
-
     // Record the cancel information when calling the cancel() method, return 
it to FE
     PPlanFragmentCancelReason _cancel_reason;
     std::string _cancel_msg;
@@ -275,16 +268,9 @@ private:
 
     const DescriptorTbl& desc_tbl() const { return _runtime_state->desc_tbl(); 
}
 
-    void _collect_query_statistics();
-
-    std::shared_ptr<QueryStatistics> _dml_query_statistics() {
-        if (_query_statistics && _query_statistics->collect_dml_statistics()) {
-            return _query_statistics;
-        }
-        return nullptr;
-    }
-
     void _collect_node_statistics();
+
+    std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index e60f29b2915..bd7dee33b54 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -19,6 +19,7 @@
 
 #include "pipeline/pipeline_fragment_context.h"
 #include "pipeline/pipeline_x/dependency.h"
+#include "runtime/runtime_query_statistics_mgr.h"
 
 namespace doris {
 
@@ -73,6 +74,7 @@ QueryContext::~QueryContext() {
         
static_cast<void>(ExecEnv::GetInstance()->lazy_release_obj_pool()->submit(
                 
std::make_shared<DelayReleaseToken>(std::move(_thread_token))));
     }
+    
_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
 }
 
 void QueryContext::set_ready_to_execute(bool is_cancelled) {
@@ -118,4 +120,15 @@ bool QueryContext::cancel(bool v, std::string msg, Status 
new_status, int fragme
     }
     return true;
 }
+
+void QueryContext::register_query_statistics(std::shared_ptr<QueryStatistics> 
qs) {
+    
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(print_id(_query_id),
 qs,
+                                                                         
coord_addr);
+}
+
+std::shared_ptr<QueryStatistics> QueryContext::get_query_statistics() {
+    return 
_exec_env->runtime_query_statistics_mgr()->get_runtime_query_statistics(
+            print_id(_query_id));
+}
+
 } // namespace doris
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 203c5b6e3f4..9e906cbbe19 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -208,6 +208,10 @@ public:
 
     pipeline::Dependency* get_execution_dependency() { return 
_execution_dependency.get(); }
 
+    void register_query_statistics(std::shared_ptr<QueryStatistics> qs);
+
+    std::shared_ptr<QueryStatistics> get_query_statistics();
+
 public:
     DescriptorTbl* desc_tbl = nullptr;
     bool set_rsc_info = false;
diff --git a/be/src/runtime/query_statistics.cpp 
b/be/src/runtime/query_statistics.cpp
index 02789e2dabe..7171803ce03 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -64,11 +64,11 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
 
 void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
     DCHECK(statistics != nullptr);
-    statistics->scan_bytes = scan_bytes;
-    statistics->scan_rows = scan_rows;
-    statistics->cpu_ms = cpu_ms;
-    statistics->returned_rows = returned_rows;
-    statistics->max_peak_memory_bytes = max_peak_memory_bytes;
+    statistics->__set_scan_bytes(scan_bytes);
+    statistics->__set_scan_rows(scan_rows);
+    statistics->__set_cpu_ms(cpu_ms);
+    statistics->__set_returned_rows(returned_rows);
+    statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes);
 }
 
 void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
diff --git a/be/src/runtime/query_statistics.h 
b/be/src/runtime/query_statistics.h
index fa39c9ea183..8c4662ba59d 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -59,13 +59,8 @@ private:
 // or plan's statistics and QueryStatisticsRecvr is responsible for collecting 
it.
 class QueryStatistics {
 public:
-    QueryStatistics(TQueryType::type query_type = TQueryType::type::SELECT)
-            : scan_rows(0),
-              scan_bytes(0),
-              cpu_ms(0),
-              returned_rows(0),
-              max_peak_memory_bytes(0),
-              _query_type(query_type) {}
+    QueryStatistics()
+            : scan_rows(0), scan_bytes(0), cpu_ms(0), returned_rows(0), 
max_peak_memory_bytes(0) {}
     virtual ~QueryStatistics();
 
     void merge(const QueryStatistics& other);
@@ -103,8 +98,9 @@ public:
     void clearNodeStatistics();
 
     void clear() {
-        scan_rows = 0;
-        scan_bytes = 0;
+        scan_rows.store(0);
+        scan_bytes.store(0);
+
         cpu_ms = 0;
         returned_rows = 0;
         max_peak_memory_bytes = 0;
@@ -119,13 +115,13 @@ public:
     bool collected() const { return _collected; }
     void set_collected() { _collected = true; }
 
-    // LOAD does not need to collect information on the exchange node.
-    bool collect_dml_statistics() { return _query_type == TQueryType::LOAD; }
+    int64_t get_scan_rows() { return scan_rows.load(); }
+    int64_t get_scan_bytes() { return scan_bytes.load(); }
 
 private:
     friend class QueryStatisticsRecvr;
-    int64_t scan_rows;
-    int64_t scan_bytes;
+    std::atomic<int64_t> scan_rows;
+    std::atomic<int64_t> scan_bytes;
     int64_t cpu_ms;
     // number rows returned by query.
     // only set once by result sink when closing.
@@ -137,7 +133,6 @@ private:
     using NodeStatisticsMap = std::unordered_map<int64_t, NodeStatistics*>;
     NodeStatisticsMap _nodes_statistics_map;
     bool _collected = false;
-    const TQueryType::type _query_type;
 };
 using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
 // It is used for collecting sub plan query statistics in DataStreamRecvr.
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
new file mode 100644
index 00000000000..6a8aa3f5097
--- /dev/null
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/runtime_query_statistics_mgr.h"
+
+#include "runtime/client_cache.h"
+#include "runtime/exec_env.h"
+#include "util/debug_util.h"
+
+namespace doris {
+
+void RuntimeQueryStatiticsMgr::register_query_statistics(std::string query_id,
+                                                         
std::shared_ptr<QueryStatistics> qs_ptr,
+                                                         TNetworkAddress 
fe_addr) {
+    std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
+    if (_query_statistics_ctx_map.find(query_id) == 
_query_statistics_ctx_map.end()) {
+        _query_statistics_ctx_map[query_id] = 
std::make_unique<QueryStatisticsCtx>(fe_addr);
+    }
+    _query_statistics_ctx_map.at(query_id)->qs_list.push_back(qs_ptr);
+}
+
+void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
+    int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
+    // 1 get query statistics map
+    std::map<TNetworkAddress, std::map<std::string, TQueryStatistics>> 
fe_qs_map;
+    std::map<std::string, bool> query_finished;
+    {
+        std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
+        for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
+            if (fe_qs_map.find(qs_ctx_ptr->fe_addr) == fe_qs_map.end()) {
+                std::map<std::string, TQueryStatistics> tmp_map;
+                fe_qs_map[qs_ctx_ptr->fe_addr] = std::move(tmp_map);
+            }
+
+            QueryStatistics tmp_qs;
+            for (auto& qs_ptr : qs_ctx_ptr->qs_list) {
+                tmp_qs.merge(*qs_ptr);
+            }
+            TQueryStatistics ret_t_qs;
+            tmp_qs.to_thrift(&ret_t_qs);
+            fe_qs_map.at(qs_ctx_ptr->fe_addr)[query_id] = ret_t_qs;
+            query_finished[query_id] = qs_ctx_ptr->is_query_finished;
+        }
+    }
+
+    // 2 report query statistics to fe
+    std::map<TNetworkAddress, bool> rpc_result;
+    for (auto& [addr, qs_map] : fe_qs_map) {
+        rpc_result[addr] = false;
+        // 2.1 get client
+        Status coord_status;
+        FrontendServiceConnection 
coord(ExecEnv::GetInstance()->frontend_client_cache(), addr,
+                                        &coord_status);
+        std::string add_str = PrintThriftNetworkAddress(addr);
+        if (!coord_status.ok()) {
+            std::stringstream ss;
+            LOG(WARNING) << "could not get client " << add_str
+                         << " when report workload runtime stats, reason is "
+                         << coord_status.to_string();
+            continue;
+        }
+
+        // 2.2 send report
+        TReportWorkloadRuntimeStatusParams report_runtime_params;
+        report_runtime_params.__set_backend_id(be_id);
+        report_runtime_params.__set_query_statistics_map(qs_map);
+
+        TReportExecStatusParams params;
+        params.report_workload_runtime_status = report_runtime_params;
+
+        TReportExecStatusResult res;
+        Status rpc_status;
+        try {
+            coord->reportExecStatus(res, params);
+            rpc_result[addr] = true;
+        } catch (apache::thrift::transport::TTransportException& e) {
+            LOG(WARNING) << "report workload runtime stats to " << add_str
+                         << " failed,  err: " << e.what();
+            rpc_status = coord.reopen();
+            if (!rpc_status.ok()) {
+                LOG(WARNING)
+                        << "reopen thrift client failed when report workload 
runtime statistics to"
+                        << add_str;
+            } else {
+                try {
+                    coord->reportExecStatus(res, params);
+                    rpc_result[addr] = true;
+                } catch (apache::thrift::transport::TTransportException& e2) {
+                    LOG(WARNING) << "retry report workload runtime stats to " 
<< add_str
+                                 << " failed,  err: " << e2.what();
+                }
+            }
+        }
+    }
+
+    //  3 when query is finished and (last rpc is send success), remove 
finished query statistics
+    {
+        std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
+        for (auto& [addr, qs_map] : fe_qs_map) {
+            if (rpc_result[addr]) {
+                for (auto& [query_id, qs] : qs_map) {
+                    if (query_finished[query_id]) {
+                        _query_statistics_ctx_map.erase(query_id);
+                    }
+                }
+            }
+        }
+    }
+}
+
+void RuntimeQueryStatiticsMgr::set_query_finished(std::string query_id) {
+    // NOTE: here must be a write lock
+    std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
+    // when a query get query_ctx succ, but failed before create node/operator,
+    // it may not register query statistics, so it can not be mark finish
+    if (_query_statistics_ctx_map.find(query_id) != 
_query_statistics_ctx_map.end()) {
+        _query_statistics_ctx_map.at(query_id)->is_query_finished = true;
+    }
+}
+
+std::shared_ptr<QueryStatistics> 
RuntimeQueryStatiticsMgr::get_runtime_query_statistics(
+        std::string query_id) {
+    std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
+    if (_query_statistics_ctx_map.find(query_id) == 
_query_statistics_ctx_map.end()) {
+        return nullptr;
+    }
+    std::shared_ptr<QueryStatistics> qs_ptr = 
std::make_shared<QueryStatistics>();
+    for (auto const& qs : _query_statistics_ctx_map[query_id]->qs_list) {
+        qs_ptr->merge(*qs);
+    }
+    return qs_ptr;
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h 
b/be/src/runtime/runtime_query_statistics_mgr.h
new file mode 100644
index 00000000000..b3fa4bbc408
--- /dev/null
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <shared_mutex>
+#include <string>
+
+#include "runtime/query_statistics.h"
+
+namespace doris {
+
+class QueryStatisticsCtx {
+public:
+    QueryStatisticsCtx(TNetworkAddress fe_addr) : fe_addr(fe_addr) {
+        this->is_query_finished = false;
+    }
+    ~QueryStatisticsCtx() = default;
+
+    std::vector<std::shared_ptr<QueryStatistics>> qs_list;
+    std::atomic<bool> is_query_finished;
+    TNetworkAddress fe_addr;
+};
+
+class RuntimeQueryStatiticsMgr {
+public:
+    RuntimeQueryStatiticsMgr() = default;
+    ~RuntimeQueryStatiticsMgr() = default;
+
+    void register_query_statistics(std::string query_id, 
std::shared_ptr<QueryStatistics> qs_ptr,
+                                   TNetworkAddress fe_addr);
+
+    void report_runtime_query_statistics();
+
+    void set_query_finished(std::string query_id);
+
+    std::shared_ptr<QueryStatistics> get_runtime_query_statistics(std::string 
query_id);
+
+private:
+    std::shared_mutex _qs_ctx_map_lock;
+    std::map<std::string, std::unique_ptr<QueryStatisticsCtx>> 
_query_statistics_ctx_map;
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 8ac1af004d9..e1f39f2948b 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -80,21 +80,6 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const 
TPlanNode& tnode,
     }
 }
 
-Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics) {
-    RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
-    if (!_is_pipeline_scan || _should_create_scanner) {
-        statistics->add_scan_bytes(_byte_read_counter->value());
-        statistics->add_scan_rows(_rows_read_counter->value());
-        statistics->add_cpu_ms(_scan_cpu_timer->value() / NANOS_PER_MILLIS);
-    }
-    return Status::OK();
-}
-
-Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics, 
int) {
-    RETURN_IF_ERROR(collect_query_statistics(statistics));
-    return Status::OK();
-}
-
 Status NewOlapScanNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(VScanNode::prepare(state));
     // if you want to add some profile in scan node, even it have not new 
VScanner object
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h 
b/be/src/vec/exec/scan/new_olap_scan_node.h
index 309bac56991..ca357b7eb7d 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.h
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -63,8 +63,6 @@ public:
     friend class doris::pipeline::OlapScanOperator;
 
     Status prepare(RuntimeState* state) override;
-    Status collect_query_statistics(QueryStatistics* statistics) override;
-    Status collect_query_statistics(QueryStatistics* statistics, int 
sender_id) override;
 
     void set_scan_ranges(RuntimeState* state,
                          const std::vector<TScanRangeParams>& scan_ranges) 
override;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 2a56a733b48..cf52341b34f 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -1332,6 +1332,9 @@ Status VScanNode::_prepare_scanners(const int 
query_parallel_instance_num) {
     if (scanners.empty()) {
         _eos = true;
     } else {
+        for (auto& scanner : scanners) {
+            scanner->set_query_statistics(_query_statistics.get());
+        }
         COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
         _start_scanners(_scanners, query_parallel_instance_num);
     }
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 0a7a8c9c019..2cdd1d503bb 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -102,6 +102,8 @@ Status VScanner::get_block(RuntimeState* state, Block* 
block, bool* eof) {
         }
     }
 
+    int64_t old_scan_rows = _num_rows_read;
+    int64_t old_scan_bytes = _num_byte_read;
     {
         do {
             // if step 2 filter all rows of block, and block will be reused to 
get next rows,
@@ -133,6 +135,11 @@ Status VScanner::get_block(RuntimeState* state, Block* 
block, bool* eof) {
                  _num_rows_read < rows_read_threshold);
     }
 
+    if (_query_statistics) {
+        _query_statistics->add_scan_rows(_num_rows_read - old_scan_rows);
+        _query_statistics->add_scan_bytes(_num_byte_read - old_scan_bytes);
+    }
+
     if (state->is_cancelled()) {
         return Status::Cancelled("cancelled");
     }
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 9fdaddcea21..23ddb65629c 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -32,6 +32,7 @@
 namespace doris {
 class RuntimeProfile;
 class TupleDescriptor;
+class QueryStatistics;
 
 namespace vectorized {
 class VExprContext;
@@ -148,6 +149,10 @@ public:
 
     void set_status_on_failure(const Status& st) { _status = st; }
 
+    void set_query_statistics(QueryStatistics* query_statistics) {
+        _query_statistics = query_statistics;
+    }
+
 protected:
     void _discard_conjuncts() {
         for (auto& conjunct : _conjuncts) {
@@ -159,6 +164,8 @@ protected:
     RuntimeState* _state = nullptr;
     VScanNode* _parent = nullptr;
     pipeline::ScanLocalStateBase* _local_state = nullptr;
+    QueryStatistics* _query_statistics = nullptr;
+
     // Set if scan node has sort limit info
     int64_t _limit = -1;
 
diff --git a/be/src/vec/exec/vexchange_node.cpp 
b/be/src/vec/exec/vexchange_node.cpp
index 6baf6749ed1..5b3e38af56a 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -63,12 +63,11 @@ Status VExchangeNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
     SCOPED_TIMER(_exec_timer);
     DCHECK_GT(_num_senders, 0);
-    _sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr());
     CHECK(state->exec_env() != nullptr);
     CHECK(state->exec_env()->vstream_mgr() != nullptr);
     _stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
             state, _input_row_desc, state->fragment_instance_id(), _id, 
_num_senders,
-            _runtime_profile.get(), _is_merging, 
_sub_plan_query_statistics_recvr);
+            _runtime_profile.get(), _is_merging);
 
     if (_is_merging) {
         RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _row_descriptor, 
_row_descriptor));
@@ -145,20 +144,6 @@ void VExchangeNode::release_resource(RuntimeState* state) {
     ExecNode::release_resource(state);
 }
 
-Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics) {
-    RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
-    if (!statistics->collect_dml_statistics()) {
-        statistics->merge(_sub_plan_query_statistics_recvr.get());
-    }
-    return Status::OK();
-}
-Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics, 
int sender_id) {
-    RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
-    if (!statistics->collect_dml_statistics()) {
-        statistics->merge(_sub_plan_query_statistics_recvr.get(), sender_id);
-    }
-    return Status::OK();
-}
 Status VExchangeNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h
index 94302e84d9b..e49eb86a92c 100644
--- a/be/src/vec/exec/vexchange_node.h
+++ b/be/src/vec/exec/vexchange_node.h
@@ -32,7 +32,6 @@ namespace doris {
 class DorisNodesInfo;
 class ObjectPool;
 class QueryStatistics;
-class QueryStatisticsRecvr;
 class RuntimeState;
 class TPlanNode;
 
@@ -55,8 +54,6 @@ public:
     Status open(RuntimeState* state) override;
     Status get_next(RuntimeState* state, Block* row_batch, bool* eos) override;
     void release_resource(RuntimeState* state) override;
-    Status collect_query_statistics(QueryStatistics* statistics) override;
-    Status collect_query_statistics(QueryStatistics* statistics, int 
sender_id) override;
     Status close(RuntimeState* state) override;
 
     void set_num_senders(int num_senders) { _num_senders = num_senders; }
@@ -67,7 +64,6 @@ private:
     bool _is_ready;
     std::shared_ptr<VDataStreamRecvr> _stream_recvr;
     RowDescriptor _input_row_desc;
-    std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
 
     // use in merge sort
     size_t _offset;
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp 
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 425e92e92e1..1210cd811d1 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -57,14 +57,13 @@ inline uint32_t VDataStreamMgr::get_hash_value(const 
TUniqueId& fragment_instanc
 
 std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr(
         RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& 
fragment_instance_id,
-        PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, 
bool is_merging,
-        std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr) 
{
+        PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, 
bool is_merging) {
     DCHECK(profile != nullptr);
     VLOG_FILE << "creating receiver for fragment=" << 
print_id(fragment_instance_id)
               << ", node=" << dest_node_id;
-    std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
-            this, state, row_desc, fragment_instance_id, dest_node_id, 
num_senders, is_merging,
-            profile, sub_plan_query_statistics_recvr));
+    std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(this, state, 
row_desc,
+                                                                 
fragment_instance_id, dest_node_id,
+                                                                 num_senders, 
is_merging, profile));
     uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
     std::lock_guard<std::mutex> l(_lock);
     _fragment_stream_set.insert(std::make_pair(fragment_instance_id, 
dest_node_id));
@@ -127,12 +126,6 @@ Status VDataStreamMgr::transmit_block(const 
PTransmitDataParams* request,
         // then the upstream node may report error status to FE, the query is 
failed.
         return Status::EndOfFile("data stream receiver is deconstructed");
     }
-    // request can only be used before calling recvr's add_batch or when 
request
-    // is the last for the sender, because request maybe released after it's 
batch
-    // is consumed by ExchangeNode.
-    if (request->has_query_statistics()) {
-        recvr->add_sub_plan_statistics(request->query_statistics(), 
request->sender_id());
-    }
 
     bool eos = request->eos();
     if (request->has_block()) {
diff --git a/be/src/vec/runtime/vdata_stream_mgr.h 
b/be/src/vec/runtime/vdata_stream_mgr.h
index d809ff96fbd..853d9846211 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.h
+++ b/be/src/vec/runtime/vdata_stream_mgr.h
@@ -39,7 +39,6 @@ namespace doris {
 class RuntimeState;
 class RowDescriptor;
 class RuntimeProfile;
-class QueryStatisticsRecvr;
 class PTransmitDataParams;
 
 namespace vectorized {
@@ -50,11 +49,11 @@ public:
     VDataStreamMgr();
     ~VDataStreamMgr();
 
-    std::shared_ptr<VDataStreamRecvr> create_recvr(
-            RuntimeState* state, const RowDescriptor& row_desc,
-            const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, 
int num_senders,
-            RuntimeProfile* profile, bool is_merging,
-            std::shared_ptr<QueryStatisticsRecvr> 
sub_plan_query_statistics_recvr);
+    std::shared_ptr<VDataStreamRecvr> create_recvr(RuntimeState* state,
+                                                   const RowDescriptor& 
row_desc,
+                                                   const TUniqueId& 
fragment_instance_id,
+                                                   PlanNodeId dest_node_id, 
int num_senders,
+                                                   RuntimeProfile* profile, 
bool is_merging);
 
     std::shared_ptr<VDataStreamRecvr> find_recvr(const TUniqueId& 
fragment_instance_id,
                                                  PlanNodeId node_id, bool 
acquire_lock = true);
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index b75bd4c21bf..dfc574591b3 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -336,11 +336,10 @@ void VDataStreamRecvr::SenderQueue::close() {
     _block_queue.clear();
 }
 
-VDataStreamRecvr::VDataStreamRecvr(
-        VDataStreamMgr* stream_mgr, RuntimeState* state, const RowDescriptor& 
row_desc,
-        const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int 
num_senders,
-        bool is_merging, RuntimeProfile* profile,
-        std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
+VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* 
state,
+                                   const RowDescriptor& row_desc,
+                                   const TUniqueId& fragment_instance_id, 
PlanNodeId dest_node_id,
+                                   int num_senders, bool is_merging, 
RuntimeProfile* profile)
         : HasTaskExecutionCtx(state),
           _mgr(stream_mgr),
 #ifdef USE_MEM_TRACKER
@@ -354,7 +353,6 @@ VDataStreamRecvr::VDataStreamRecvr(
           _is_closed(false),
           _profile(profile),
           _peak_memory_usage_counter(nullptr),
-          _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr),
           _enable_pipeline(state->enable_pipeline_exec()),
           _mem_available(std::make_shared<bool>(true)) {
     // DataStreamRecvr may be destructed after the instance execution thread 
ends.
@@ -483,17 +481,6 @@ void VDataStreamRecvr::remove_sender(int sender_id, int 
be_number, Status exec_s
     _sender_queues[use_sender_id]->decrement_senders(be_number);
 }
 
-void VDataStreamRecvr::remove_sender(int sender_id, int be_number, 
QueryStatisticsPtr statistics,
-                                     Status exec_status) {
-    if (!exec_status.ok()) {
-        cancel_stream(exec_status);
-        return;
-    }
-    int use_sender_id = _is_merging ? sender_id : 0;
-    _sender_queues[use_sender_id]->decrement_senders(be_number);
-    _sub_plan_query_statistics_recvr->insert(statistics, sender_id);
-}
-
 void VDataStreamRecvr::cancel_stream(Status exec_status) {
     VLOG_QUERY << "cancel_stream: fragment_instance_id=" << 
print_id(_fragment_instance_id)
                << exec_status;
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 7f9436cba5a..5e64268276e 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -56,7 +56,6 @@ namespace doris {
 class MemTracker;
 class PBlock;
 class MemTrackerLimiter;
-class PQueryStatistics;
 class RuntimeState;
 
 namespace pipeline {
@@ -76,8 +75,7 @@ public:
     class SenderQueue;
     VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state, const 
RowDescriptor& row_desc,
                      const TUniqueId& fragment_instance_id, PlanNodeId 
dest_node_id,
-                     int num_senders, bool is_merging, RuntimeProfile* profile,
-                     std::shared_ptr<QueryStatisticsRecvr> 
sub_plan_query_statistics_recvr);
+                     int num_senders, bool is_merging, RuntimeProfile* 
profile);
 
     virtual ~VDataStreamRecvr();
 
@@ -103,17 +101,10 @@ public:
     PlanNodeId dest_node_id() const { return _dest_node_id; }
     const RowDescriptor& row_desc() const { return _row_desc; }
 
-    void add_sub_plan_statistics(const PQueryStatistics& statistics, int 
sender_id) {
-        _sub_plan_query_statistics_recvr->insert(statistics, sender_id);
-    }
-
     // Indicate that a particular sender is done. Delegated to the appropriate
     // sender queue. Called from DataStreamMgr.
     void remove_sender(int sender_id, int be_number, Status exec_status);
 
-    void remove_sender(int sender_id, int be_number, QueryStatisticsPtr 
statistics,
-                       Status exec_status);
-
     void cancel_stream(Status exec_status);
 
     void close();
@@ -184,8 +175,6 @@ private:
     // Number of blocks received
     RuntimeProfile::Counter* _blocks_produced_counter = nullptr;
 
-    std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
-
     bool _enable_pipeline;
     std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>
             _sender_to_local_channel_dependency;
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 53c140ef5a8..6c4d10839e0 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -152,13 +152,7 @@ Status Channel<Parent>::send_local_block(Status 
exec_status, bool eos) {
 
         _local_recvr->add_block(&block, _parent->sender_id(), true);
         if (eos) {
-            /// TODO: Supported on pipelineX, we can hold QueryStatistics on 
the fragment instead of on instances.
-            if constexpr (std::is_same_v<VDataStreamSender, Parent>) {
-                _local_recvr->remove_sender(_parent->sender_id(), _be_number,
-                                            _parent->query_statisticsPtr(), 
exec_status);
-            } else {
-                _local_recvr->remove_sender(_parent->sender_id(), _be_number, 
exec_status);
-            }
+            _local_recvr->remove_sender(_parent->sender_id(), _be_number, 
exec_status);
         }
         return Status::OK();
     } else {
@@ -199,10 +193,6 @@ Status Channel<Parent>::send_remote_block(PBlock* block, 
bool eos, Status exec_s
     VLOG_ROW << "Channel<Parent>::send_batch() instance_id=" << 
print_id(_fragment_instance_id)
              << " dest_node=" << _dest_node_id << " to_host=" << 
_brpc_dest_addr.hostname
              << " _packet_seq=" << _packet_seq << " row_desc=" << 
_row_desc.debug_string();
-    if (_is_transfer_chain && (_send_query_statistics_with_every_batch || 
eos)) {
-        auto statistic = _brpc_request->mutable_query_statistics();
-        _parent->query_statistics()->to_pb(statistic);
-    }
 
     _brpc_request->set_eos(eos);
     if (!exec_status.ok()) {
@@ -289,12 +279,7 @@ Status Channel<Parent>::close_internal(Status exec_status) 
{
         SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
         if (is_local()) {
             if (_recvr_is_valid()) {
-                if constexpr (std::is_same_v<VDataStreamSender, Parent>) {
-                    _local_recvr->remove_sender(_parent->sender_id(), 
_be_number,
-                                                
_parent->query_statisticsPtr(), exec_status);
-                } else {
-                    _local_recvr->remove_sender(_parent->sender_id(), 
_be_number, exec_status);
-                }
+                _local_recvr->remove_sender(_parent->sender_id(), _be_number, 
exec_status);
             }
         } else {
             status = send_remote_block((PBlock*)nullptr, true, exec_status);
@@ -329,8 +314,7 @@ void Channel<Parent>::ch_roll_pb_block() {
 
 VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, 
int sender_id,
                                      const RowDescriptor& row_desc, const 
TDataStreamSink& sink,
-                                     const 
std::vector<TPlanFragmentDestination>& destinations,
-                                     bool 
send_query_statistics_with_every_batch)
+                                     const 
std::vector<TPlanFragmentDestination>& destinations)
         : DataSink(row_desc),
           _sender_id(sender_id),
           _state(state),
@@ -351,21 +335,17 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, 
ObjectPool* pool, int
     _enable_pipeline_exec = state->enable_pipeline_exec();
 
     for (int i = 0; i < destinations.size(); ++i) {
-        // Select first dest as transfer chain.
-        bool is_transfer_chain = (i == 0);
         const auto& fragment_instance_id = 
destinations[i].fragment_instance_id;
         if (fragment_id_to_channel_index.find(fragment_instance_id.lo) ==
             fragment_id_to_channel_index.end()) {
             if (_enable_pipeline_exec) {
                 _channel_shared_ptrs.emplace_back(new 
PipChannel<VDataStreamSender>(
                         this, row_desc, destinations[i].brpc_server, 
fragment_instance_id,
-                        sink.dest_node_id, is_transfer_chain,
-                        send_query_statistics_with_every_batch));
+                        sink.dest_node_id));
             } else {
                 _channel_shared_ptrs.emplace_back(
                         new Channel(this, row_desc, 
destinations[i].brpc_server,
-                                    fragment_instance_id, sink.dest_node_id, 
is_transfer_chain,
-                                    send_query_statistics_with_every_batch));
+                                    fragment_instance_id, sink.dest_node_id));
             }
             fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
                                                  _channel_shared_ptrs.size() - 
1);
@@ -388,8 +368,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, 
ObjectPool* pool, int
 
 VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, 
int sender_id,
                                      const RowDescriptor& row_desc, PlanNodeId 
dest_node_id,
-                                     const 
std::vector<TPlanFragmentDestination>& destinations,
-                                     bool 
send_query_statistics_with_every_batch)
+                                     const 
std::vector<TPlanFragmentDestination>& destinations)
         : DataSink(row_desc),
           _sender_id(sender_id),
           _state(state),
@@ -405,9 +384,9 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, 
ObjectPool* pool, int
         const auto& fragment_instance_id = 
destinations[i].fragment_instance_id;
         if (fragment_id_to_channel_index.find(fragment_instance_id.lo) ==
             fragment_id_to_channel_index.end()) {
-            _channel_shared_ptrs.emplace_back(
-                    new Channel(this, row_desc, destinations[i].brpc_server, 
fragment_instance_id,
-                                _dest_node_id, false, 
send_query_statistics_with_every_batch));
+            _channel_shared_ptrs.emplace_back(new Channel(this, row_desc,
+                                                          
destinations[i].brpc_server,
+                                                          
fragment_instance_id, _dest_node_id));
         }
         fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
                                              _channel_shared_ptrs.size() - 1);
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index f59dad266f8..ca020d9bab8 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -108,13 +108,11 @@ public:
     friend class pipeline::ExchangeSinkOperator;
     VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id,
                       const RowDescriptor& row_desc, const TDataStreamSink& 
sink,
-                      const std::vector<TPlanFragmentDestination>& 
destinations,
-                      bool send_query_statistics_with_every_batch);
+                      const std::vector<TPlanFragmentDestination>& 
destinations);
 
     VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id,
                       const RowDescriptor& row_desc, PlanNodeId dest_node_id,
-                      const std::vector<TPlanFragmentDestination>& 
destinations,
-                      bool send_query_statistics_with_every_batch);
+                      const std::vector<TPlanFragmentDestination>& 
destinations);
 
     ~VDataStreamSender() override;
 
@@ -145,8 +143,6 @@ public:
         return _split_block_distribute_by_channel_timer;
     }
     MemTracker* mem_tracker() { return _mem_tracker.get(); }
-    QueryStatistics* query_statistics() { return _query_statistics.get(); }
-    QueryStatisticsPtr query_statisticsPtr() { return _query_statistics; }
     bool transfer_large_data_by_brpc() { return _transfer_large_data_by_brpc; }
     RuntimeProfile::Counter* merge_block_timer() { return _merge_block_timer; }
     segment_v2::CompressionTypePB compression_type() const { return 
_compression_type; }
@@ -237,8 +233,7 @@ public:
     // how much tuple data is getting accumulated before being sent; it only 
applies
     // when data is added via add_row() and not sent directly via send_batch().
     Channel(Parent* parent, const RowDescriptor& row_desc, const 
TNetworkAddress& brpc_dest,
-            const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, 
bool is_transfer_chain,
-            bool send_query_statistics_with_every_batch)
+            const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id)
             : _parent(parent),
               _row_desc(row_desc),
               _fragment_instance_id(fragment_instance_id),
@@ -248,8 +243,6 @@ public:
               _need_close(false),
               _closed(false),
               _brpc_dest_addr(brpc_dest),
-              _is_transfer_chain(is_transfer_chain),
-              
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
               _is_local((_brpc_dest_addr.hostname == 
BackendOptions::get_localhost()) &&
                         (_brpc_dest_addr.port == config::brpc_port)),
               _serializer(_parent, _is_local) {
@@ -380,9 +373,6 @@ protected:
     std::shared_ptr<DummyBrpcCallback<PTransmitDataResult>> 
_send_remote_block_callback;
     Status _receiver_status;
     int32_t _brpc_timeout_ms = 500;
-    // whether the dest can be treated as query statistics transfer chain.
-    bool _is_transfer_chain;
-    bool _send_query_statistics_with_every_batch;
     RuntimeState* _state = nullptr;
 
     bool _is_local;
@@ -442,10 +432,8 @@ template <typename Parent = VDataStreamSender>
 class PipChannel final : public Channel<Parent> {
 public:
     PipChannel(Parent* parent, const RowDescriptor& row_desc, const 
TNetworkAddress& brpc_dest,
-               const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
-               bool is_transfer_chain, bool 
send_query_statistics_with_every_batch)
-            : Channel<Parent>(parent, row_desc, brpc_dest, 
fragment_instance_id, dest_node_id,
-                              is_transfer_chain, 
send_query_statistics_with_every_batch) {
+               const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id)
+            : Channel<Parent>(parent, row_desc, brpc_dest, 
fragment_instance_id, dest_node_id) {
         ch_roll_pb_block();
     }
 
diff --git a/be/src/vec/sink/vresult_file_sink.cpp 
b/be/src/vec/sink/vresult_file_sink.cpp
index 5bb0de4b506..02d77fa6d42 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -32,7 +32,6 @@
 #include "vec/exprs/vexpr.h"
 
 namespace doris {
-class QueryStatistics;
 class TExpr;
 } // namespace doris
 
@@ -45,15 +44,13 @@ VResultFileSink::VResultFileSink(const RowDescriptor& 
row_desc,
 VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool, int 
sender_id,
                                  const RowDescriptor& row_desc, const 
TResultFileSink& sink,
                                  const std::vector<TPlanFragmentDestination>& 
destinations,
-                                 bool send_query_statistics_with_every_batch,
                                  const std::vector<TExpr>& t_output_expr, 
DescriptorTbl& descs)
         : AsyncWriterSink<VFileResultWriter, VRESULT_FILE_SINK>(row_desc, 
t_output_expr),
           
_output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false) 
{
     _is_top_sink = false;
     CHECK_EQ(destinations.size(), 1);
     _stream_sender.reset(new VDataStreamSender(state, pool, sender_id, 
row_desc, sink.dest_node_id,
-                                               destinations,
-                                               
send_query_statistics_with_every_batch));
+                                               destinations));
 }
 
 Status VResultFileSink::init(const TDataSink& tsink) {
@@ -127,7 +124,7 @@ Status VResultFileSink::close(RuntimeState* state, Status 
exec_status) {
     if (_is_top_sink) {
         // close sender, this is normal path end
         if (_sender) {
-            _sender->update_num_written_rows(_writer == nullptr ? 0 : 
_writer->get_written_rows());
+            _sender->update_return_rows(_writer == nullptr ? 0 : 
_writer->get_written_rows());
             static_cast<void>(_sender->close(final_status));
         }
         static_cast<void>(state->exec_env()->result_mgr()->cancel_at_time(
@@ -148,12 +145,4 @@ Status VResultFileSink::close(RuntimeState* state, Status 
exec_status) {
     return Status::OK();
 }
 
-void VResultFileSink::set_query_statistics(std::shared_ptr<QueryStatistics> 
statistics) {
-    if (_is_top_sink) {
-        _sender->set_query_statistics(statistics);
-    } else {
-        _stream_sender->set_query_statistics(statistics);
-    }
-}
-
 } // namespace doris::vectorized
diff --git a/be/src/vec/sink/vresult_file_sink.h 
b/be/src/vec/sink/vresult_file_sink.h
index b0d05823a50..65bc0492d89 100644
--- a/be/src/vec/sink/vresult_file_sink.h
+++ b/be/src/vec/sink/vresult_file_sink.h
@@ -34,7 +34,6 @@
 namespace doris {
 class BufferControlBlock;
 class ObjectPool;
-class QueryStatistics;
 class RuntimeProfile;
 class RuntimeState;
 class TDataSink;
@@ -54,7 +53,6 @@ public:
     VResultFileSink(RuntimeState* state, ObjectPool* pool, int sender_id,
                     const RowDescriptor& row_desc, const TResultFileSink& sink,
                     const std::vector<TPlanFragmentDestination>& destinations,
-                    bool send_query_statistics_with_every_batch,
                     const std::vector<TExpr>& t_output_expr, DescriptorTbl& 
descs);
 
     Status init(const TDataSink& thrift_sink) override;
@@ -65,8 +63,6 @@ public:
     // hosts. Further send() calls are illegal after calling close().
     Status close(RuntimeState* state, Status exec_status) override;
 
-    void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) 
override;
-
 private:
     // set file options when sink type is FILE
     std::unique_ptr<ResultFileOptions> _file_opts;
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index 3fa2e035976..59bf82483c5 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -41,7 +41,6 @@
 #include "vec/sink/writer/vfile_result_writer.h"
 
 namespace doris {
-class QueryStatistics;
 class RowDescriptor;
 class TExpr;
 
@@ -169,9 +168,8 @@ Status VResultSink::close(RuntimeState* state, Status 
exec_status) {
     // close sender, this is normal path end
     if (_sender) {
         if (_writer) {
-            _sender->update_num_written_rows(_writer->get_written_rows());
+            _sender->update_return_rows(_writer->get_written_rows());
         }
-        _sender->update_max_peak_memory_bytes();
         static_cast<void>(_sender->close(final_status));
     }
     static_cast<void>(state->exec_env()->result_mgr()->cancel_at_time(
@@ -180,9 +178,5 @@ Status VResultSink::close(RuntimeState* state, Status 
exec_status) {
     return DataSink::close(state, exec_status);
 }
 
-void VResultSink::set_query_statistics(std::shared_ptr<QueryStatistics> 
statistics) {
-    _sender->set_query_statistics(statistics);
-}
-
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/sink/vresult_sink.h b/be/src/vec/sink/vresult_sink.h
index 0cde7399c45..0dd69ee84cd 100644
--- a/be/src/vec/sink/vresult_sink.h
+++ b/be/src/vec/sink/vresult_sink.h
@@ -34,7 +34,6 @@ namespace doris {
 class RuntimeState;
 class RuntimeProfile;
 class BufferControlBlock;
-class QueryStatistics;
 class ResultWriter;
 class RowDescriptor;
 class TExpr;
@@ -138,8 +137,6 @@ public:
     // hosts. Further send() calls are illegal after calling close().
     Status close(RuntimeState* state, Status exec_status) override;
 
-    void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) 
override;
-
 private:
     Status prepare_exprs(RuntimeState* state);
     Status second_phase_fetch_data(RuntimeState* state, Block* final_block);
diff --git a/be/test/vec/runtime/vdata_stream_test.cpp 
b/be/test/vec/runtime/vdata_stream_test.cpp
index 2ac8f8a6482..86e6803fd6b 100644
--- a/be/test/vec/runtime/vdata_stream_test.cpp
+++ b/be/test/vec/runtime/vdata_stream_test.cpp
@@ -171,9 +171,8 @@ TEST_F(VDataStreamTest, BasicTest) {
     int num_senders = 1;
     RuntimeProfile profile("profile");
     bool is_merge = false;
-    std::shared_ptr<QueryStatisticsRecvr> statistics = 
std::make_shared<QueryStatisticsRecvr>();
     auto recv = _instance.create_recvr(&runtime_stat, row_desc, uid, nid, 
num_senders, &profile,
-                                       is_merge, statistics);
+                                       is_merge);
 
     // Test Sender
     int sender_id = 1;
@@ -194,10 +193,8 @@ TEST_F(VDataStreamTest, BasicTest) {
         dest.__set_server(addr);
         dests.push_back(dest);
     }
-    bool send_query_statistics_with_every_batch = false;
     VDataStreamSender sender(&runtime_stat, &_object_pool, sender_id, 
row_desc, tsink.stream_sink,
-                             dests, send_query_statistics_with_every_batch);
-    sender.set_query_statistics(std::make_shared<QueryStatistics>());
+                             dests);
     static_cast<void>(sender.init(tsink));
     static_cast<void>(sender.prepare(&runtime_stat));
     static_cast<void>(sender.open(&runtime_stat));
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 248b0f0dbe7..af9d9bee4f4 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2337,6 +2337,16 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int workload_max_action_num_in_policy = 5; // mainly used to 
limit set session var action
 
+    @ConfField(mutable = true)
+    public static int workload_runtime_status_thread_interval_ms = 2000;
+
+    // NOTE: it should bigger than be config 
report_query_statistics_interval_ms
+    @ConfField(mutable = true)
+    public static int query_audit_log_timeout_ms = 5000;
+
+    @ConfField(mutable = true)
+    public static int be_report_query_statistics_timeout_ms = 60000;
+
     @ConfField(mutable = true, masterOnly = true)
     public static int workload_group_max_num = 15;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 3b3f5939fd4..2f33e12da47 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -230,6 +230,7 @@ import org.apache.doris.qe.QueryCancelWorker;
 import org.apache.doris.qe.VariableMgr;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
+import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
 import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
 import 
org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyPublisher;
 import org.apache.doris.scheduler.manager.TransientTaskManager;
@@ -491,6 +492,9 @@ public class Env {
     private WorkloadGroupMgr workloadGroupMgr;
 
     private WorkloadSchedPolicyMgr workloadSchedPolicyMgr;
+
+    private WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr;
+
     private QueryStats queryStats;
 
     private StatisticsCleaner statisticsCleaner;
@@ -739,6 +743,7 @@ public class Env {
         this.globalFunctionMgr = new GlobalFunctionMgr();
         this.workloadGroupMgr = new WorkloadGroupMgr();
         this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr();
+        this.workloadRuntimeStatusMgr = new WorkloadRuntimeStatusMgr();
         this.queryStats = new QueryStats();
         this.loadManagerAdapter = new LoadManagerAdapter();
         this.hiveTransactionMgr = new HiveTransactionMgr();
@@ -835,6 +840,10 @@ public class Env {
         return workloadSchedPolicyMgr;
     }
 
+    public WorkloadRuntimeStatusMgr getWorkloadRuntimeStatusMgr() {
+        return workloadRuntimeStatusMgr;
+    }
+
     // use this to get correct ClusterInfoService instance
     public static SystemInfoService getCurrentSystemInfo() {
         return getCurrentEnv().getClusterInfo();
@@ -1014,6 +1023,7 @@ public class Env {
         workloadGroupMgr.startUpdateThread();
         workloadSchedPolicyMgr.start();
         workloadActionPublisherThread.start();
+        workloadRuntimeStatusMgr.start();
     }
 
     // wait until FE is ready.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index 01a03e8c267..732d33c5e18 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -100,6 +100,8 @@ public class AuditEvent {
     @AuditField(value = "FuzzyVariables")
     public String fuzzyVariables = "";
 
+    public long pushToAuditLogQueueTime;
+
     public static class AuditEventBuilder {
 
         private AuditEvent auditEvent = new AuditEvent();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index 4181f66786c..83cd1d401f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -119,6 +119,6 @@ public class AuditLogHelper {
                 }
             }
         }
-        
Env.getCurrentAuditEventProcessor().handleAuditEvent(ctx.getAuditEventBuilder().build());
+        
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(ctx.getAuditEventBuilder().build());
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 8daf40054ad..e4d8f8273f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -271,7 +271,8 @@ public abstract class ConnectProcessor {
                         break;
                     }
                 }
-                auditAfterExec(auditStmt, executor.getParsedStmt(), 
executor.getQueryStatisticsForAuditLog(), true);
+                auditAfterExec(auditStmt, executor.getParsedStmt(), 
executor.getQueryStatisticsForAuditLog(),
+                        true);
                 // execute failed, skip remaining stmts
                 if (ctx.getState().getStateType() == MysqlStateType.ERR) {
                     break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index a3ee3c09e4e..b6d902b76cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -201,6 +201,15 @@ public final class QeProcessorImpl implements QeProcessor {
             LOG.debug("params: {}", params);
         }
         final TReportExecStatusResult result = new TReportExecStatusResult();
+
+        if (params.isSetReportWorkloadRuntimeStatus()) {
+            
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().updateBeQueryStats(params.report_workload_runtime_status);
+            if (!params.isSetQueryId()) {
+                result.setStatus(new TStatus(TStatusCode.OK));
+                return result;
+            }
+        }
+
         final QueryInfo info = coordinatorMap.get(params.query_id);
 
         if (info == null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
new file mode 100644
index 00000000000..085d844e616
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource.workloadschedpolicy;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.Daemon;
+import org.apache.doris.plugin.AuditEvent;
+import org.apache.doris.thrift.TQueryStatistics;
+import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class WorkloadRuntimeStatusMgr {
+
+    private static final Logger LOG = 
LogManager.getLogger(WorkloadRuntimeStatusMgr.class);
+    private Map<Long, Map<String, TQueryStatistics>> beToQueryStatsMap = 
Maps.newConcurrentMap();
+    private Map<Long, Long> beLastReportTime = Maps.newConcurrentMap();
+    private Map<String, Long> queryLastReportTime = Maps.newConcurrentMap();
+    private final ReentrantReadWriteLock queryAuditEventLock = new 
ReentrantReadWriteLock();
+    private List<AuditEvent> queryAuditEventList = Lists.newLinkedList();
+
+    class WorkloadRuntimeStatsThread extends Daemon {
+
+        WorkloadRuntimeStatusMgr workloadStatsMgr;
+
+        public WorkloadRuntimeStatsThread(WorkloadRuntimeStatusMgr 
workloadRuntimeStatusMgr, String threadName,
+                int interval) {
+            super(threadName, interval);
+            this.workloadStatsMgr = workloadRuntimeStatusMgr;
+        }
+
+        @Override
+        protected void runOneCycle() {
+            // 1 merge be query statistics
+            Map<String, TQueryStatistics> queryStatisticsMap = 
workloadStatsMgr.getQueryStatisticsMap();
+
+            // 2 log query audit
+            List<AuditEvent> auditEventList = 
workloadStatsMgr.getQueryNeedAudit();
+            for (AuditEvent auditEvent : auditEventList) {
+                TQueryStatistics queryStats = 
queryStatisticsMap.get(auditEvent.queryId);
+                if (queryStats != null) {
+                    auditEvent.scanRows = queryStats.scan_rows;
+                    auditEvent.scanBytes = queryStats.scan_bytes;
+                    auditEvent.peakMemoryBytes = 
queryStats.max_peak_memory_bytes;
+                }
+                
Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
+            }
+
+            // 3 clear beToQueryStatsMap when be report timeout
+            workloadStatsMgr.clearReportTimeoutBeStatistics();
+        }
+
+    }
+
+    private Daemon thread = null;
+
+    public void submitFinishQueryToAudit(AuditEvent event) {
+        queryAuditEventLogWriteLock();
+        try {
+            event.pushToAuditLogQueueTime = System.currentTimeMillis();
+            queryAuditEventList.add(event);
+        } finally {
+            queryAuditEventLogWriteUnlock();
+        }
+    }
+
+    public List<AuditEvent> getQueryNeedAudit() {
+        List<AuditEvent> ret = new ArrayList<>();
+        long currentTime = System.currentTimeMillis();
+        queryAuditEventLogWriteLock();
+        try {
+            int queryAuditLogTimeout = Config.query_audit_log_timeout_ms;
+            Iterator<AuditEvent> iter = queryAuditEventList.iterator();
+            while (iter.hasNext()) {
+                AuditEvent ae = iter.next();
+                if (currentTime - ae.pushToAuditLogQueueTime > 
queryAuditLogTimeout) {
+                    ret.add(ae);
+                    iter.remove();
+                } else {
+                    break;
+                }
+            }
+        } finally {
+            queryAuditEventLogWriteUnlock();
+        }
+        return ret;
+    }
+
+    public void start() {
+        thread = new WorkloadRuntimeStatsThread(this, 
"workload-runtime-stats-thread",
+                Config.workload_runtime_status_thread_interval_ms);
+        thread.start();
+    }
+
+    public void updateBeQueryStats(TReportWorkloadRuntimeStatusParams params) {
+        if (!params.isSetBackendId()) {
+            LOG.warn("be report workload runtime status but without beid");
+            return;
+        }
+        if (!params.isSetQueryStatisticsMap()) {
+            LOG.warn("be report workload runtime status but without query 
stats map");
+            return;
+        }
+        long beId = params.backend_id;
+        Map<String, TQueryStatistics> queryIdMap = beToQueryStatsMap.get(beId);
+        beLastReportTime.put(beId, System.currentTimeMillis());
+        if (queryIdMap == null) {
+            queryIdMap = Maps.newConcurrentMap();
+            queryIdMap.putAll(params.query_statistics_map);
+            beToQueryStatsMap.put(beId, queryIdMap);
+        } else {
+            long currentTime = System.currentTimeMillis();
+            for (Map.Entry<String, TQueryStatistics> entry : 
params.query_statistics_map.entrySet()) {
+                queryIdMap.put(entry.getKey(), entry.getValue());
+                queryLastReportTime.put(entry.getKey(), currentTime);
+            }
+        }
+    }
+
+    public Map<String, TQueryStatistics> getQueryStatisticsMap() {
+        // 1 merge query stats in all be
+        Set<Long> beIdSet = beToQueryStatsMap.keySet();
+        Map<String, TQueryStatistics> retQueryMap = Maps.newHashMap();
+        for (Long beId : beIdSet) {
+            Map<String, TQueryStatistics> currentQueryMap = 
beToQueryStatsMap.get(beId);
+            Set<String> queryIdSet = currentQueryMap.keySet();
+            for (String queryId : queryIdSet) {
+                TQueryStatistics retQuery = retQueryMap.get(queryId);
+                if (retQuery == null) {
+                    retQuery = new TQueryStatistics();
+                    retQueryMap.put(queryId, retQuery);
+                }
+
+                TQueryStatistics curQueryStats = currentQueryMap.get(queryId);
+                mergeQueryStatistics(retQuery, curQueryStats);
+            }
+        }
+
+        return retQueryMap;
+    }
+
+    private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics 
src) {
+        dst.scan_rows += src.scan_rows;
+        dst.scan_bytes += src.scan_bytes;
+        dst.cpu_ms += src.cpu_ms;
+        if (dst.max_peak_memory_bytes < src.max_peak_memory_bytes) {
+            dst.max_peak_memory_bytes = src.max_peak_memory_bytes;
+        }
+    }
+
+    void clearReportTimeoutBeStatistics() {
+        // 1 clear report timeout be
+        Set<Long> beNeedToRemove = new HashSet<>();
+        Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
+        Long currentTime = System.currentTimeMillis();
+        for (Long beId : currentBeIdSet) {
+            Long lastReportTime = beLastReportTime.get(beId);
+            if (lastReportTime != null
+                    && currentTime - lastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
+                beNeedToRemove.add(beId);
+            }
+        }
+        for (Long beId : beNeedToRemove) {
+            beToQueryStatsMap.remove(beId);
+            beLastReportTime.remove(beId);
+        }
+
+        // 2 clear report timeout query
+        Set<String> queryNeedToClear = new HashSet<>();
+        Long newCurrentTime = System.currentTimeMillis();
+        Set<String> queryLastReportTimeKeySet = queryLastReportTime.keySet();
+        for (String queryId : queryLastReportTimeKeySet) {
+            Long lastReportTime = queryLastReportTime.get(queryId);
+            if (lastReportTime != null
+                    && newCurrentTime - lastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
+                queryNeedToClear.add(queryId);
+            }
+        }
+
+        Set<Long> beIdSet = beToQueryStatsMap.keySet();
+        for (String queryId : queryNeedToClear) {
+            for (Long beId : beIdSet) {
+                beToQueryStatsMap.get(beId).remove(queryId);
+            }
+            queryLastReportTime.remove(queryId);
+        }
+    }
+
+    private void queryAuditEventLogWriteLock() {
+        queryAuditEventLock.writeLock().lock();
+    }
+
+    private void queryAuditEventLogWriteUnlock() {
+        queryAuditEventLock.writeLock().unlock();
+    }
+}
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 24450e4d908..02d3efe50ed 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -406,6 +406,11 @@ struct TQueryStatistics {
     5: optional i64 max_peak_memory_bytes
 }
 
+struct TReportWorkloadRuntimeStatusParams {
+    1: optional i64 backend_id
+    2: map<string, TQueryStatistics> query_statistics_map
+}
+
 // The results of an INSERT query, sent to the coordinator as part of
 // TReportExecStatusParams
 struct TReportExecStatusParams {
@@ -470,6 +475,8 @@ struct TReportExecStatusParams {
   23: optional list<TDetailedReportParams> detailed_report
 
   24: optional TQueryStatistics query_statistics
+
+  25: TReportWorkloadRuntimeStatusParams report_workload_runtime_status
 }
 
 struct TFeResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to