This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6c988e73ebf [Feature](profile)add shuffle send rows/bytes #30456 6c988e73ebf is described below commit 6c988e73ebf4ec2aac558a3cffc76f36baed06ad Author: wangbo <wan...@apache.org> AuthorDate: Sat Jan 27 17:09:17 2024 +0800 [Feature](profile)add shuffle send rows/bytes #30456 --- be/src/exec/data_sink.cpp | 16 ++++++++++++++++ be/src/exec/data_sink.h | 7 ++++++- be/src/exec/exec_node.cpp | 2 +- be/src/pipeline/exec/exchange_sink_buffer.h | 1 - be/src/pipeline/pipeline_x/operator.cpp | 4 +++- be/src/pipeline/pipeline_x/operator.h | 6 +++++- be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 5 ++++- be/src/runtime/query_statistics.cpp | 11 +++++++---- be/src/runtime/query_statistics.h | 19 +++++++++++++++++-- be/src/vec/sink/vdata_stream_sender.cpp | 3 +++ .../org/apache/doris/plugin/audit/AuditEvent.java | 4 ++++ .../workloadschedpolicy/WorkloadRuntimeStatusMgr.java | 5 +++++ .../ActiveQueriesTableValuedFunction.java | 2 ++ .../apache/doris/tablefunction/MetadataGenerator.java | 4 ++++ gensrc/thrift/FrontendService.thrift | 2 ++ 15 files changed, 79 insertions(+), 12 deletions(-) diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index c58bbdb2523..5809912f4a2 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -30,6 +30,8 @@ #include <string> #include "common/config.h" +#include "runtime/query_context.h" +#include "runtime/query_statistics.h" #include "vec/sink/async_writer_sink.h" #include "vec/sink/group_commit_block_sink.h" #include "vec/sink/multi_cast_data_stream_sink.h" @@ -44,6 +46,14 @@ namespace doris { class DescriptorTbl; class TExpr; +DataSink::DataSink(const RowDescriptor& desc) : _row_desc(desc) { + _query_statistics = std::make_shared<QueryStatistics>(); +} + +std::shared_ptr<QueryStatistics> DataSink::get_query_statistics_ptr() { + return _query_statistics; +} + Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink, const std::vector<TExpr>& output_exprs, const TPlanFragmentExecParams& params, @@ -175,6 +185,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink if (*sink != nullptr) { RETURN_IF_ERROR((*sink)->init(thrift_sink)); + if (state->get_query_ctx()) { + state->get_query_ctx()->register_query_statistics((*sink)->get_query_statistics_ptr()); + } } return Status::OK(); @@ -318,6 +331,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink if (*sink != nullptr) { RETURN_IF_ERROR((*sink)->init(thrift_sink)); RETURN_IF_ERROR((*sink)->prepare(state)); + if (state->get_query_ctx()) { + state->get_query_ctx()->register_query_statistics((*sink)->get_query_statistics_ptr()); + } } return Status::OK(); diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h index c0b27e8ae90..e08d40ab023 100644 --- a/be/src/exec/data_sink.h +++ b/be/src/exec/data_sink.h @@ -40,6 +40,7 @@ class TDataSink; class TExpr; class TPipelineFragmentParams; class TOlapTableSink; +class QueryStatistics; namespace vectorized { class Block; @@ -48,7 +49,7 @@ class Block; // Superclass of all data sinks. class DataSink { public: - DataSink(const RowDescriptor& desc) : _row_desc(desc) {} + DataSink(const RowDescriptor& desc); virtual ~DataSink() {} virtual Status init(const TDataSink& thrift_sink); @@ -103,6 +104,8 @@ public: virtual bool can_write() { return true; } + std::shared_ptr<QueryStatistics> get_query_statistics_ptr(); + private: static bool _has_inverted_index_or_partial_update(TOlapTableSink sink); @@ -124,6 +127,8 @@ protected: _output_rows_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1); _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1); } + + std::shared_ptr<QueryStatistics> _query_statistics = nullptr; }; } // namespace doris diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index ac6e7eae9a0..368e94562a2 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -255,7 +255,7 @@ Status ExecNode::create_tree_helper(RuntimeState* state, ObjectPool* pool, // Step 1 Create current ExecNode according to current thrift plan node. ExecNode* cur_exec_node = nullptr; RETURN_IF_ERROR(create_node(state, pool, cur_plan_node, descs, &cur_exec_node)); - if (cur_exec_node != nullptr) { + if (cur_exec_node != nullptr && state->get_query_ctx()) { state->get_query_ctx()->register_query_statistics(cur_exec_node->get_query_statistics()); } diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index f0b55d528ec..c04d2973d5e 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -273,7 +273,6 @@ private: static constexpr int QUEUE_CAPACITY_FACTOR = 64; std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency; std::shared_ptr<Dependency> _finish_dependency; - QueryStatistics* _statistics = nullptr; std::atomic<bool> _should_stop {false}; }; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index db687865657..ef17ee70ed6 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -342,7 +342,9 @@ Status OperatorX<LocalStateType>::setup_local_state(RuntimeState* state, LocalSt PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase* parent, RuntimeState* state) - : _parent(parent), _state(state) {} + : _parent(parent), _state(state) { + _query_statistics = std::make_shared<QueryStatistics>(); +} PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent) : _num_rows_returned(0), diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 1c076bd5a69..61afe40e126 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -111,7 +111,7 @@ public: // override in Scan MultiCastSink virtual RuntimeFilterDependency* filterdependency() { return nullptr; } - std::shared_ptr<QueryStatistics> query_statistics_ptr() { return _query_statistics; } + std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return _query_statistics; } protected: friend class OperatorXBase; @@ -424,6 +424,8 @@ public: // override in exchange sink , AsyncWriterSink virtual Dependency* finishdependency() { return nullptr; } + std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return _query_statistics; } + protected: DataSinkOperatorXBase* _parent = nullptr; RuntimeState* _state = nullptr; @@ -447,6 +449,8 @@ protected: RuntimeProfile::Counter* _exec_timer = nullptr; RuntimeProfile::Counter* _memory_used_counter = nullptr; RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr; + + std::shared_ptr<QueryStatistics> _query_statistics = nullptr; }; class DataSinkOperatorXBase : public OperatorBase { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 9a88c417be0..f0b87ce8613 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -92,6 +92,9 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, _operators.front()->node_id(), no_scan_ranges); auto* parent_profile = _state->get_sink_local_state(_sink->operator_id())->profile(); + query_ctx->register_query_statistics( + _state->get_sink_local_state(_sink->operator_id())->get_query_statistics_ptr()); + for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) { auto& op = _operators[op_idx]; auto& deps = get_upstream_dependency(op->operator_id()); @@ -105,7 +108,7 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const RETURN_IF_ERROR(op->setup_local_state(_state, info)); parent_profile = _state->get_local_state(op->operator_id())->profile(); query_ctx->register_query_statistics( - _state->get_local_state(op->operator_id())->query_statistics_ptr()); + _state->get_local_state(op->operator_id())->get_query_statistics_ptr()); } _block = doris::vectorized::Block::create_unique(); diff --git a/be/src/runtime/query_statistics.cpp b/be/src/runtime/query_statistics.cpp index ab49b02ad43..e12e4e08c52 100644 --- a/be/src/runtime/query_statistics.cpp +++ b/be/src/runtime/query_statistics.cpp @@ -40,10 +40,11 @@ void NodeStatistics::from_pb(const PNodeStatistics& node_statistics) { } void QueryStatistics::merge(const QueryStatistics& other) { - scan_rows += other.scan_rows; - scan_bytes += other.scan_bytes; - int64_t other_cpu_time = other.cpu_nanos.load(std::memory_order_relaxed); - cpu_nanos += other_cpu_time; + scan_rows += other.scan_rows.load(std::memory_order_relaxed); + scan_bytes += other.scan_bytes.load(std::memory_order_relaxed); + cpu_nanos += other.cpu_nanos.load(std::memory_order_relaxed); + shuffle_send_bytes += other.shuffle_send_bytes.load(std::memory_order_relaxed); + shuffle_send_rows += other.shuffle_send_rows.load(std::memory_order_relaxed); int64_t other_peak_mem = other.max_peak_memory_bytes.load(std::memory_order_relaxed); if (other_peak_mem > this->max_peak_memory_bytes) { @@ -85,6 +86,8 @@ void QueryStatistics::to_thrift(TQueryStatistics* statistics) const { statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes.load(std::memory_order_relaxed)); statistics->__set_current_used_memory_bytes( current_used_memory_bytes.load(std::memory_order_relaxed)); + statistics->__set_shuffle_send_bytes(shuffle_send_bytes.load(std::memory_order_relaxed)); + statistics->__set_shuffle_send_rows(shuffle_send_rows.load(std::memory_order_relaxed)); } void QueryStatistics::from_pb(const PQueryStatistics& statistics) { diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h index abaf0a251a8..8a18a152e4f 100644 --- a/be/src/runtime/query_statistics.h +++ b/be/src/runtime/query_statistics.h @@ -65,7 +65,9 @@ public: cpu_nanos(0), returned_rows(0), max_peak_memory_bytes(0), - current_used_memory_bytes(0) {} + current_used_memory_bytes(0), + shuffle_send_bytes(0), + shuffle_send_rows(0) {} virtual ~QueryStatistics(); void merge(const QueryStatistics& other); @@ -82,6 +84,14 @@ public: this->cpu_nanos.fetch_add(delta_cpu_time, std::memory_order_relaxed); } + void add_shuffle_send_bytes(int64_t delta_bytes) { + this->shuffle_send_bytes.fetch_add(delta_bytes, std::memory_order_relaxed); + } + + void add_shuffle_send_rows(int64_t delta_rows) { + this->shuffle_send_rows.fetch_add(delta_rows, std::memory_order_relaxed); + } + NodeStatistics* add_nodes_statistics(int64_t node_id) { NodeStatistics* nodeStatistics = nullptr; auto iter = _nodes_statistics_map.find(node_id); @@ -115,8 +125,10 @@ public: void clear() { scan_rows.store(0, std::memory_order_relaxed); scan_bytes.store(0, std::memory_order_relaxed); - cpu_nanos.store(0, std::memory_order_relaxed); + shuffle_send_bytes.store(0, std::memory_order_relaxed); + shuffle_send_rows.store(0, std::memory_order_relaxed); + returned_rows = 0; max_peak_memory_bytes.store(0, std::memory_order_relaxed); clearNodeStatistics(); @@ -152,6 +164,9 @@ private: NodeStatisticsMap _nodes_statistics_map; bool _collected = false; std::atomic<int64_t> current_used_memory_bytes; + + std::atomic<int64_t> shuffle_send_bytes; + std::atomic<int64_t> shuffle_send_rows; }; using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>; // It is used for collecting sub plan query statistics in DataStreamRecvr. diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 79ffebcc21e..35900964274 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -765,6 +765,9 @@ Status BlockSerializer<Parent>::serialize_block(const Block* src, PBlock* dest, COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * num_receivers); COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes * num_receivers); COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time()); + _parent->get_query_statistics_ptr()->add_shuffle_send_bytes(compressed_bytes * + num_receivers); + _parent->get_query_statistics_ptr()->add_shuffle_send_rows(src->rows() * num_receivers); } return Status::OK(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java index 5c122c98965..c8915966725 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java @@ -88,6 +88,10 @@ public class AuditEvent { public String stmt = ""; @AuditField(value = "CpuTimeMS") public long cpuTimeMs = -1; + @AuditField(value = "ShuffleSendBytes") + public long shuffleSendBytes = -1; + @AuditField(value = "ShuffleSendRows") + public long shuffleSendRows = -1; @AuditField(value = "SqlHash") public String sqlHash = ""; @AuditField(value = "peakMemoryBytes") diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java index 3c5d7fc8bf1..7bec1d1954a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java @@ -70,6 +70,8 @@ public class WorkloadRuntimeStatusMgr { auditEvent.scanBytes = queryStats.scan_bytes; auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes; auditEvent.cpuTimeMs = queryStats.cpu_ms; + auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes; + auditEvent.shuffleSendRows = queryStats.shuffle_send_rows; } Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent); } @@ -139,6 +141,7 @@ public class WorkloadRuntimeStatusMgr { } else { long currentTime = System.currentTimeMillis(); for (Map.Entry<String, TQueryStatistics> entry : params.query_statistics_map.entrySet()) { + LOG.info("log2109 queryid={}, shuffle={}", entry.getKey(), entry.getValue().shuffle_send_bytes); queryIdMap.put(entry.getKey(), entry.getValue()); queryLastReportTime.put(entry.getKey(), currentTime); } @@ -175,6 +178,8 @@ public class WorkloadRuntimeStatusMgr { dst.scan_rows += src.scan_rows; dst.scan_bytes += src.scan_bytes; dst.cpu_ms += src.cpu_ms; + dst.shuffle_send_bytes += src.shuffle_send_bytes; + dst.shuffle_send_rows += src.shuffle_send_rows; if (dst.max_peak_memory_bytes < src.max_peak_memory_bytes) { dst.max_peak_memory_bytes = src.max_peak_memory_bytes; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java index 0839ae56a67..27f65ed7680 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java @@ -46,6 +46,8 @@ public class ActiveQueriesTableValuedFunction extends MetadataTableValuedFunctio new Column("ScanBytes", PrimitiveType.BIGINT), new Column("BePeakMemoryBytes", PrimitiveType.BIGINT), new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT), + new Column("ShuffleSendBytes", PrimitiveType.BIGINT), + new Column("ShuffleSendRows", PrimitiveType.BIGINT), new Column("Database", ScalarType.createStringType()), new Column("FrontendInstance", ScalarType.createStringType()), new Column("Sql", ScalarType.createStringType())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index e4768660698..21505586491 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -441,6 +441,8 @@ public class MetadataGenerator { trow.addToColumnValue(new TCell().setLongVal(qs.scan_bytes)); trow.addToColumnValue(new TCell().setLongVal(qs.max_peak_memory_bytes)); trow.addToColumnValue(new TCell().setLongVal(qs.current_used_memory_bytes)); + trow.addToColumnValue(new TCell().setLongVal(qs.shuffle_send_bytes)); + trow.addToColumnValue(new TCell().setLongVal(qs.shuffle_send_rows)); } else { trow.addToColumnValue(new TCell().setLongVal(0L)); trow.addToColumnValue(new TCell().setLongVal(0L)); @@ -448,6 +450,8 @@ public class MetadataGenerator { trow.addToColumnValue(new TCell().setLongVal(0L)); trow.addToColumnValue(new TCell().setLongVal(0L)); trow.addToColumnValue(new TCell().setLongVal(0L)); + trow.addToColumnValue(new TCell().setLongVal(0L)); + trow.addToColumnValue(new TCell().setLongVal(0L)); } if (queryInfo.getConnectContext() != null) { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 7b65103c581..d96ee346148 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -406,6 +406,8 @@ struct TQueryStatistics { 5: optional i64 max_peak_memory_bytes 6: optional i64 current_used_memory_bytes 7: optional i64 workload_group_id + 8: optional i64 shuffle_send_bytes + 9: optional i64 shuffle_send_rows } struct TReportWorkloadRuntimeStatusParams { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org