This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c988e73ebf [Feature](profile)add shuffle send rows/bytes #30456
6c988e73ebf is described below

commit 6c988e73ebf4ec2aac558a3cffc76f36baed06ad
Author: wangbo <wan...@apache.org>
AuthorDate: Sat Jan 27 17:09:17 2024 +0800

    [Feature](profile)add shuffle send rows/bytes #30456
---
 be/src/exec/data_sink.cpp                             | 16 ++++++++++++++++
 be/src/exec/data_sink.h                               |  7 ++++++-
 be/src/exec/exec_node.cpp                             |  2 +-
 be/src/pipeline/exec/exchange_sink_buffer.h           |  1 -
 be/src/pipeline/pipeline_x/operator.cpp               |  4 +++-
 be/src/pipeline/pipeline_x/operator.h                 |  6 +++++-
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp        |  5 ++++-
 be/src/runtime/query_statistics.cpp                   | 11 +++++++----
 be/src/runtime/query_statistics.h                     | 19 +++++++++++++++++--
 be/src/vec/sink/vdata_stream_sender.cpp               |  3 +++
 .../org/apache/doris/plugin/audit/AuditEvent.java     |  4 ++++
 .../workloadschedpolicy/WorkloadRuntimeStatusMgr.java |  5 +++++
 .../ActiveQueriesTableValuedFunction.java             |  2 ++
 .../apache/doris/tablefunction/MetadataGenerator.java |  4 ++++
 gensrc/thrift/FrontendService.thrift                  |  2 ++
 15 files changed, 79 insertions(+), 12 deletions(-)

diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index c58bbdb2523..5809912f4a2 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -30,6 +30,8 @@
 #include <string>
 
 #include "common/config.h"
+#include "runtime/query_context.h"
+#include "runtime/query_statistics.h"
 #include "vec/sink/async_writer_sink.h"
 #include "vec/sink/group_commit_block_sink.h"
 #include "vec/sink/multi_cast_data_stream_sink.h"
@@ -44,6 +46,14 @@ namespace doris {
 class DescriptorTbl;
 class TExpr;
 
+DataSink::DataSink(const RowDescriptor& desc) : _row_desc(desc) {
+    _query_statistics = std::make_shared<QueryStatistics>();
+}
+
+std::shared_ptr<QueryStatistics> DataSink::get_query_statistics_ptr() {
+    return _query_statistics;
+}
+
 Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& 
thrift_sink,
                                   const std::vector<TExpr>& output_exprs,
                                   const TPlanFragmentExecParams& params,
@@ -175,6 +185,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
 
     if (*sink != nullptr) {
         RETURN_IF_ERROR((*sink)->init(thrift_sink));
+        if (state->get_query_ctx()) {
+            
state->get_query_ctx()->register_query_statistics((*sink)->get_query_statistics_ptr());
+        }
     }
 
     return Status::OK();
@@ -318,6 +331,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
     if (*sink != nullptr) {
         RETURN_IF_ERROR((*sink)->init(thrift_sink));
         RETURN_IF_ERROR((*sink)->prepare(state));
+        if (state->get_query_ctx()) {
+            
state->get_query_ctx()->register_query_statistics((*sink)->get_query_statistics_ptr());
+        }
     }
 
     return Status::OK();
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index c0b27e8ae90..e08d40ab023 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -40,6 +40,7 @@ class TDataSink;
 class TExpr;
 class TPipelineFragmentParams;
 class TOlapTableSink;
+class QueryStatistics;
 
 namespace vectorized {
 class Block;
@@ -48,7 +49,7 @@ class Block;
 // Superclass of all data sinks.
 class DataSink {
 public:
-    DataSink(const RowDescriptor& desc) : _row_desc(desc) {}
+    DataSink(const RowDescriptor& desc);
     virtual ~DataSink() {}
 
     virtual Status init(const TDataSink& thrift_sink);
@@ -103,6 +104,8 @@ public:
 
     virtual bool can_write() { return true; }
 
+    std::shared_ptr<QueryStatistics> get_query_statistics_ptr();
+
 private:
     static bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
 
@@ -124,6 +127,8 @@ protected:
         _output_rows_counter = ADD_COUNTER_WITH_LEVEL(_profile, 
"RowsProduced", TUnit::UNIT, 1);
         _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, 
"BlocksProduced", TUnit::UNIT, 1);
     }
+
+    std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
 };
 
 } // namespace doris
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index ac6e7eae9a0..368e94562a2 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -255,7 +255,7 @@ Status ExecNode::create_tree_helper(RuntimeState* state, 
ObjectPool* pool,
     // Step 1 Create current ExecNode according to current thrift plan node.
     ExecNode* cur_exec_node = nullptr;
     RETURN_IF_ERROR(create_node(state, pool, cur_plan_node, descs, 
&cur_exec_node));
-    if (cur_exec_node != nullptr) {
+    if (cur_exec_node != nullptr && state->get_query_ctx()) {
         
state->get_query_ctx()->register_query_statistics(cur_exec_node->get_query_statistics());
     }
 
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index f0b55d528ec..c04d2973d5e 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -273,7 +273,6 @@ private:
     static constexpr int QUEUE_CAPACITY_FACTOR = 64;
     std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency;
     std::shared_ptr<Dependency> _finish_dependency;
-    QueryStatistics* _statistics = nullptr;
     std::atomic<bool> _should_stop {false};
 };
 
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index db687865657..ef17ee70ed6 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -342,7 +342,9 @@ Status 
OperatorX<LocalStateType>::setup_local_state(RuntimeState* state, LocalSt
 
 
PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase* 
parent,
                                                          RuntimeState* state)
-        : _parent(parent), _state(state) {}
+        : _parent(parent), _state(state) {
+    _query_statistics = std::make_shared<QueryStatistics>();
+}
 
 PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, 
OperatorXBase* parent)
         : _num_rows_returned(0),
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 1c076bd5a69..61afe40e126 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -111,7 +111,7 @@ public:
     //  override in Scan  MultiCastSink
     virtual RuntimeFilterDependency* filterdependency() { return nullptr; }
 
-    std::shared_ptr<QueryStatistics> query_statistics_ptr() { return 
_query_statistics; }
+    std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return 
_query_statistics; }
 
 protected:
     friend class OperatorXBase;
@@ -424,6 +424,8 @@ public:
     // override in exchange sink , AsyncWriterSink
     virtual Dependency* finishdependency() { return nullptr; }
 
+    std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return 
_query_statistics; }
+
 protected:
     DataSinkOperatorXBase* _parent = nullptr;
     RuntimeState* _state = nullptr;
@@ -447,6 +449,8 @@ protected:
     RuntimeProfile::Counter* _exec_timer = nullptr;
     RuntimeProfile::Counter* _memory_used_counter = nullptr;
     RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
+
+    std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
 };
 
 class DataSinkOperatorXBase : public OperatorBase {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 9a88c417be0..f0b87ce8613 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -92,6 +92,9 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams& 
local_params, const
     auto scan_ranges = find_with_default(local_params.per_node_scan_ranges,
                                          _operators.front()->node_id(), 
no_scan_ranges);
     auto* parent_profile = 
_state->get_sink_local_state(_sink->operator_id())->profile();
+    query_ctx->register_query_statistics(
+            
_state->get_sink_local_state(_sink->operator_id())->get_query_statistics_ptr());
+
     for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
         auto& op = _operators[op_idx];
         auto& deps = get_upstream_dependency(op->operator_id());
@@ -105,7 +108,7 @@ Status PipelineXTask::prepare(const 
TPipelineInstanceParams& local_params, const
         RETURN_IF_ERROR(op->setup_local_state(_state, info));
         parent_profile = _state->get_local_state(op->operator_id())->profile();
         query_ctx->register_query_statistics(
-                
_state->get_local_state(op->operator_id())->query_statistics_ptr());
+                
_state->get_local_state(op->operator_id())->get_query_statistics_ptr());
     }
 
     _block = doris::vectorized::Block::create_unique();
diff --git a/be/src/runtime/query_statistics.cpp 
b/be/src/runtime/query_statistics.cpp
index ab49b02ad43..e12e4e08c52 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -40,10 +40,11 @@ void NodeStatistics::from_pb(const PNodeStatistics& 
node_statistics) {
 }
 
 void QueryStatistics::merge(const QueryStatistics& other) {
-    scan_rows += other.scan_rows;
-    scan_bytes += other.scan_bytes;
-    int64_t other_cpu_time = other.cpu_nanos.load(std::memory_order_relaxed);
-    cpu_nanos += other_cpu_time;
+    scan_rows += other.scan_rows.load(std::memory_order_relaxed);
+    scan_bytes += other.scan_bytes.load(std::memory_order_relaxed);
+    cpu_nanos += other.cpu_nanos.load(std::memory_order_relaxed);
+    shuffle_send_bytes += 
other.shuffle_send_bytes.load(std::memory_order_relaxed);
+    shuffle_send_rows += 
other.shuffle_send_rows.load(std::memory_order_relaxed);
 
     int64_t other_peak_mem = 
other.max_peak_memory_bytes.load(std::memory_order_relaxed);
     if (other_peak_mem > this->max_peak_memory_bytes) {
@@ -85,6 +86,8 @@ void QueryStatistics::to_thrift(TQueryStatistics* statistics) 
const {
     
statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes.load(std::memory_order_relaxed));
     statistics->__set_current_used_memory_bytes(
             current_used_memory_bytes.load(std::memory_order_relaxed));
+    
statistics->__set_shuffle_send_bytes(shuffle_send_bytes.load(std::memory_order_relaxed));
+    
statistics->__set_shuffle_send_rows(shuffle_send_rows.load(std::memory_order_relaxed));
 }
 
 void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
diff --git a/be/src/runtime/query_statistics.h 
b/be/src/runtime/query_statistics.h
index abaf0a251a8..8a18a152e4f 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -65,7 +65,9 @@ public:
               cpu_nanos(0),
               returned_rows(0),
               max_peak_memory_bytes(0),
-              current_used_memory_bytes(0) {}
+              current_used_memory_bytes(0),
+              shuffle_send_bytes(0),
+              shuffle_send_rows(0) {}
     virtual ~QueryStatistics();
 
     void merge(const QueryStatistics& other);
@@ -82,6 +84,14 @@ public:
         this->cpu_nanos.fetch_add(delta_cpu_time, std::memory_order_relaxed);
     }
 
+    void add_shuffle_send_bytes(int64_t delta_bytes) {
+        this->shuffle_send_bytes.fetch_add(delta_bytes, 
std::memory_order_relaxed);
+    }
+
+    void add_shuffle_send_rows(int64_t delta_rows) {
+        this->shuffle_send_rows.fetch_add(delta_rows, 
std::memory_order_relaxed);
+    }
+
     NodeStatistics* add_nodes_statistics(int64_t node_id) {
         NodeStatistics* nodeStatistics = nullptr;
         auto iter = _nodes_statistics_map.find(node_id);
@@ -115,8 +125,10 @@ public:
     void clear() {
         scan_rows.store(0, std::memory_order_relaxed);
         scan_bytes.store(0, std::memory_order_relaxed);
-
         cpu_nanos.store(0, std::memory_order_relaxed);
+        shuffle_send_bytes.store(0, std::memory_order_relaxed);
+        shuffle_send_rows.store(0, std::memory_order_relaxed);
+
         returned_rows = 0;
         max_peak_memory_bytes.store(0, std::memory_order_relaxed);
         clearNodeStatistics();
@@ -152,6 +164,9 @@ private:
     NodeStatisticsMap _nodes_statistics_map;
     bool _collected = false;
     std::atomic<int64_t> current_used_memory_bytes;
+
+    std::atomic<int64_t> shuffle_send_bytes;
+    std::atomic<int64_t> shuffle_send_rows;
 };
 using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
 // It is used for collecting sub plan query statistics in DataStreamRecvr.
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 79ffebcc21e..35900964274 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -765,6 +765,9 @@ Status BlockSerializer<Parent>::serialize_block(const 
Block* src, PBlock* dest,
         COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * 
num_receivers);
         COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, 
uncompressed_bytes * num_receivers);
         COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time());
+        
_parent->get_query_statistics_ptr()->add_shuffle_send_bytes(compressed_bytes *
+                                                                    
num_receivers);
+        _parent->get_query_statistics_ptr()->add_shuffle_send_rows(src->rows() 
* num_receivers);
     }
 
     return Status::OK();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
index 5c122c98965..c8915966725 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
@@ -88,6 +88,10 @@ public class AuditEvent {
     public String stmt = "";
     @AuditField(value = "CpuTimeMS")
     public long cpuTimeMs = -1;
+    @AuditField(value = "ShuffleSendBytes")
+    public long shuffleSendBytes = -1;
+    @AuditField(value = "ShuffleSendRows")
+    public long shuffleSendRows = -1;
     @AuditField(value = "SqlHash")
     public String sqlHash = "";
     @AuditField(value = "peakMemoryBytes")
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index 3c5d7fc8bf1..7bec1d1954a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -70,6 +70,8 @@ public class WorkloadRuntimeStatusMgr {
                     auditEvent.scanBytes = queryStats.scan_bytes;
                     auditEvent.peakMemoryBytes = 
queryStats.max_peak_memory_bytes;
                     auditEvent.cpuTimeMs = queryStats.cpu_ms;
+                    auditEvent.shuffleSendBytes = 
queryStats.shuffle_send_bytes;
+                    auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
                 }
                 
Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
             }
@@ -139,6 +141,7 @@ public class WorkloadRuntimeStatusMgr {
         } else {
             long currentTime = System.currentTimeMillis();
             for (Map.Entry<String, TQueryStatistics> entry : 
params.query_statistics_map.entrySet()) {
+                LOG.info("log2109 queryid={}, shuffle={}", entry.getKey(), 
entry.getValue().shuffle_send_bytes);
                 queryIdMap.put(entry.getKey(), entry.getValue());
                 queryLastReportTime.put(entry.getKey(), currentTime);
             }
@@ -175,6 +178,8 @@ public class WorkloadRuntimeStatusMgr {
         dst.scan_rows += src.scan_rows;
         dst.scan_bytes += src.scan_bytes;
         dst.cpu_ms += src.cpu_ms;
+        dst.shuffle_send_bytes += src.shuffle_send_bytes;
+        dst.shuffle_send_rows += src.shuffle_send_rows;
         if (dst.max_peak_memory_bytes < src.max_peak_memory_bytes) {
             dst.max_peak_memory_bytes = src.max_peak_memory_bytes;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
index 0839ae56a67..27f65ed7680 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
@@ -46,6 +46,8 @@ public class ActiveQueriesTableValuedFunction extends 
MetadataTableValuedFunctio
             new Column("ScanBytes", PrimitiveType.BIGINT),
             new Column("BePeakMemoryBytes", PrimitiveType.BIGINT),
             new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT),
+            new Column("ShuffleSendBytes", PrimitiveType.BIGINT),
+            new Column("ShuffleSendRows", PrimitiveType.BIGINT),
             new Column("Database", ScalarType.createStringType()),
             new Column("FrontendInstance", ScalarType.createStringType()),
             new Column("Sql", ScalarType.createStringType()));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index e4768660698..21505586491 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -441,6 +441,8 @@ public class MetadataGenerator {
             trow.addToColumnValue(new TCell().setLongVal(qs.scan_bytes));
             trow.addToColumnValue(new 
TCell().setLongVal(qs.max_peak_memory_bytes));
             trow.addToColumnValue(new 
TCell().setLongVal(qs.current_used_memory_bytes));
+            trow.addToColumnValue(new 
TCell().setLongVal(qs.shuffle_send_bytes));
+            trow.addToColumnValue(new 
TCell().setLongVal(qs.shuffle_send_rows));
         } else {
             trow.addToColumnValue(new TCell().setLongVal(0L));
             trow.addToColumnValue(new TCell().setLongVal(0L));
@@ -448,6 +450,8 @@ public class MetadataGenerator {
             trow.addToColumnValue(new TCell().setLongVal(0L));
             trow.addToColumnValue(new TCell().setLongVal(0L));
             trow.addToColumnValue(new TCell().setLongVal(0L));
+            trow.addToColumnValue(new TCell().setLongVal(0L));
+            trow.addToColumnValue(new TCell().setLongVal(0L));
         }
 
         if (queryInfo.getConnectContext() != null) {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 7b65103c581..d96ee346148 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -406,6 +406,8 @@ struct TQueryStatistics {
     5: optional i64 max_peak_memory_bytes
     6: optional i64 current_used_memory_bytes
     7: optional i64 workload_group_id
+    8: optional i64 shuffle_send_bytes
+    9: optional i64 shuffle_send_rows
 }
 
 struct TReportWorkloadRuntimeStatusParams {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to