This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 836cda65d82 [refactor](profilev2) split merged profile to a single runtime profile to make the logic more clear (#27184) 836cda65d82 is described below commit 836cda65d8296af8d278b6f9614b885de0c62050 Author: yiguolei <676222...@qq.com> AuthorDate: Sun Nov 19 13:21:50 2023 +0800 [refactor](profilev2) split merged profile to a single runtime profile to make the logic more clear (#27184) --- be/src/exec/data_sink.h | 10 ++ be/src/exec/exec_node.cpp | 6 +- be/src/pipeline/exec/exchange_sink_operator.h | 36 +++---- be/src/util/runtime_profile.cpp | 4 +- be/src/vec/sink/async_writer_sink.h | 4 + be/src/vec/sink/group_commit_block_sink.cpp | 1 + be/src/vec/sink/multi_cast_data_stream_sink.h | 4 + be/src/vec/sink/vdata_stream_sender.cpp | 6 +- be/src/vec/sink/vdata_stream_sender.h | 1 - be/src/vec/sink/vmemory_scratch_sink.cpp | 1 + be/src/vec/sink/vresult_sink.cpp | 4 + be/src/vec/sink/vtablet_sink_v2.cpp | 1 + .../doris/common/profile/AggregatedProfile.java | 41 ++++++++ .../doris/common/profile/ExecutionProfile.java | 25 +++++ .../org/apache/doris/common/profile/Profile.java | 102 ++++++++++++------ .../doris/common/profile/SummaryProfile.java | 19 +++- .../apache/doris/common/util/ProfileManager.java | 47 +++------ .../apache/doris/common/util/RuntimeProfile.java | 117 ++++++--------------- .../apache/doris/load/loadv2/LoadLoadingTask.java | 2 +- .../org/apache/doris/nereids/txn/Transaction.java | 2 +- .../java/org/apache/doris/qe/StmtExecutor.java | 20 ++-- 21 files changed, 269 insertions(+), 184 deletions(-) diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h index 285653c43b8..16527616386 100644 --- a/be/src/exec/data_sink.h +++ b/be/src/exec/data_sink.h @@ -126,6 +126,16 @@ protected: // Maybe this will be transferred to BufferControlBlock. std::shared_ptr<QueryStatistics> _query_statistics; + + RuntimeProfile::Counter* _exec_timer = nullptr; + RuntimeProfile::Counter* _blocks_sent_counter = nullptr; + RuntimeProfile::Counter* _output_rows_counter = nullptr; + + void init_sink_common_profile() { + _exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1); + _output_rows_counter = ADD_COUNTER_WITH_LEVEL(_profile, "OutputRows", TUnit::UNIT, 1); + _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "OutputBlockCount", TUnit::UNIT, 1); + } }; } // namespace doris diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 4489f65778c..c6bf3a5aa31 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -127,11 +127,11 @@ Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) { Status ExecNode::prepare(RuntimeState* state) { DCHECK(_runtime_profile.get() != nullptr); _exec_timer = ADD_TIMER_WITH_LEVEL(runtime_profile(), "ExecTime", 1); - _rows_returned_counter = - ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsReturned", TUnit::UNIT, 1); + _rows_returned_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile, "OutputRows", TUnit::UNIT, 1); _output_bytes_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile, "OutputBytes", TUnit::BYTES, 1); - _block_count_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile, "BlockCount", TUnit::UNIT, 1); + _block_count_counter = + ADD_COUNTER_WITH_LEVEL(_runtime_profile, "OutputBlockCount", TUnit::UNIT, 1); _projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime"); _rows_returned_rate = runtime_profile()->add_derived_counter( ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND, diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 9dc670cd668..48189c8eb4a 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -197,27 +197,27 @@ private: friend class vectorized::BlockSerializer<ExchangeSinkLocalState>; std::unique_ptr<ExchangeSinkBuffer<ExchangeSinkLocalState>> _sink_buffer; - RuntimeProfile::Counter* _serialize_batch_timer; - RuntimeProfile::Counter* _compress_timer; - RuntimeProfile::Counter* _brpc_send_timer; - RuntimeProfile::Counter* _brpc_wait_timer; - RuntimeProfile::Counter* _bytes_sent_counter; - RuntimeProfile::Counter* _uncompressed_bytes_counter; - RuntimeProfile::Counter* _local_sent_rows; - RuntimeProfile::Counter* _local_send_timer; - RuntimeProfile::Counter* _split_block_hash_compute_timer; - RuntimeProfile::Counter* _split_block_distribute_by_channel_timer; - RuntimeProfile::Counter* _blocks_sent_counter; + RuntimeProfile::Counter* _serialize_batch_timer = nullptr; + RuntimeProfile::Counter* _compress_timer = nullptr; + RuntimeProfile::Counter* _brpc_send_timer = nullptr; + RuntimeProfile::Counter* _brpc_wait_timer = nullptr; + RuntimeProfile::Counter* _bytes_sent_counter = nullptr; + RuntimeProfile::Counter* _uncompressed_bytes_counter = nullptr; + RuntimeProfile::Counter* _local_sent_rows = nullptr; + RuntimeProfile::Counter* _local_send_timer = nullptr; + RuntimeProfile::Counter* _split_block_hash_compute_timer = nullptr; + RuntimeProfile::Counter* _split_block_distribute_by_channel_timer = nullptr; + RuntimeProfile::Counter* _blocks_sent_counter = nullptr; // Throughput per total time spent in sender - RuntimeProfile::Counter* _overall_throughput; + RuntimeProfile::Counter* _overall_throughput = nullptr; // Used to counter send bytes under local data exchange - RuntimeProfile::Counter* _local_bytes_send_counter; - RuntimeProfile::Counter* _merge_block_timer; - RuntimeProfile::Counter* _memory_usage_counter; - RuntimeProfile::Counter* _peak_memory_usage_counter; + RuntimeProfile::Counter* _local_bytes_send_counter = nullptr; + RuntimeProfile::Counter* _merge_block_timer = nullptr; + RuntimeProfile::Counter* _memory_usage_counter = nullptr; + RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr; - RuntimeProfile::Counter* _wait_queue_timer; - RuntimeProfile::Counter* _wait_broadcast_buffer_timer; + RuntimeProfile::Counter* _wait_queue_timer = nullptr; + RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr; std::vector<RuntimeProfile::Counter*> _wait_channel_timer; // Sender instance id, unique within a fragment. diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index 345035a5c99..70e95dee287 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -49,8 +49,10 @@ RuntimeProfile::RuntimeProfile(const std::string& name, bool is_averaged_profile _metadata(-1), _timestamp(-1), _is_averaged_profile(is_averaged_profile), - _counter_total_time(TUnit::TIME_NS, 0, 1), + _counter_total_time(TUnit::TIME_NS, 0, 3), _local_time_percent(0) { + // TotalTime counter has level3 to disable it from plan profile, because + // it contains its child running time, we use exec time instead. _counter_map["TotalTime"] = &_counter_total_time; } diff --git a/be/src/vec/sink/async_writer_sink.h b/be/src/vec/sink/async_writer_sink.h index 1d566036e12..a7e30f0bcb9 100644 --- a/be/src/vec/sink/async_writer_sink.h +++ b/be/src/vec/sink/async_writer_sink.h @@ -65,6 +65,7 @@ public: title << _name << " (frag_id=" << state->fragment_instance_id() << ")"; // create profile _profile = state->obj_pool()->add(new RuntimeProfile(title.str())); + init_sink_common_profile(); return Status::OK(); } @@ -80,6 +81,9 @@ public: } Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override { + SCOPED_TIMER(_exec_timer); + COUNTER_UPDATE(_blocks_sent_counter, 1); + COUNTER_UPDATE(_output_rows_counter, block->rows()); return _writer->append_block(*block); } diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index ddff0cf3304..e2f7480468d 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -56,6 +56,7 @@ Status GroupCommitBlockSink::prepare(RuntimeState* state) { // profile must add to state's object pool _profile = state->obj_pool()->add(new RuntimeProfile("OlapTableSink")); + init_sink_common_profile(); _mem_tracker = std::make_shared<MemTracker>("OlapTableSink:" + std::to_string(state->load_job_id())); SCOPED_TIMER(_profile->total_time_counter()); diff --git a/be/src/vec/sink/multi_cast_data_stream_sink.h b/be/src/vec/sink/multi_cast_data_stream_sink.h index 852c08397a9..b2229142837 100644 --- a/be/src/vec/sink/multi_cast_data_stream_sink.h +++ b/be/src/vec/sink/multi_cast_data_stream_sink.h @@ -27,11 +27,15 @@ public: MultiCastDataStreamSink(std::shared_ptr<pipeline::MultiCastDataStreamer>& streamer) : DataSink(streamer->row_desc()), _multi_cast_data_streamer(streamer) { _profile = _multi_cast_data_streamer->profile(); + init_sink_common_profile(); }; ~MultiCastDataStreamSink() override = default; Status send(RuntimeState* state, Block* block, bool eos = false) override { + SCOPED_TIMER(_exec_timer); + COUNTER_UPDATE(_blocks_sent_counter, 1); + COUNTER_UPDATE(_output_rows_counter, block->rows()); static_cast<void>(_multi_cast_data_streamer->push(state, block, eos)); return Status::OK(); }; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 0fc2fba38ef..128ad0b37f8 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -431,6 +431,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) { std::string title = fmt::format("VDataStreamSender (dst_id={}, dst_fragments=[{}])", _dest_node_id, instances); _profile = _pool->add(new RuntimeProfile(title)); + init_sink_common_profile(); SCOPED_TIMER(_profile->total_time_counter()); _mem_tracker = std::make_unique<MemTracker>("VDataStreamSender:" + print_id(state->fragment_instance_id())); @@ -464,7 +465,6 @@ Status VDataStreamSender::prepare(RuntimeState* state) { _split_block_distribute_by_channel_timer = ADD_TIMER(profile(), "SplitBlockDistributeByChannelTime"); _merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime"); - _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(profile(), "BlocksSent", TUnit::UNIT, 1); _overall_throughput = profile()->add_derived_counter( "OverallThroughput", TUnit::BYTES_PER_SECOND, std::bind<int64_t>(&RuntimeProfile::units_per_second, _bytes_sent_counter, @@ -507,6 +507,8 @@ void VDataStreamSender::_handle_eof_channel(RuntimeState* state, ChannelPtrType Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { SCOPED_TIMER(_profile->total_time_counter()); + SCOPED_TIMER(_exec_timer); + COUNTER_UPDATE(_output_rows_counter, block->rows()); _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); bool all_receiver_eof = true; for (auto channel : _channels) { @@ -641,6 +643,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { } Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) { + SCOPED_TIMER(_exec_timer); _serializer.reset_block(); Status final_st = Status::OK(); for (int i = 0; i < _channels.size(); ++i) { @@ -653,6 +656,7 @@ Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) { } Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { + SCOPED_TIMER(_exec_timer); if (_closed) { return Status::OK(); } diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 8a7e32c2b2b..04587983540 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -204,7 +204,6 @@ protected: RuntimeProfile::Counter* _local_send_timer {}; RuntimeProfile::Counter* _split_block_hash_compute_timer {}; RuntimeProfile::Counter* _split_block_distribute_by_channel_timer {}; - RuntimeProfile::Counter* _blocks_sent_counter {}; RuntimeProfile::Counter* _merge_block_timer {}; RuntimeProfile::Counter* _memory_usage_counter {}; RuntimeProfile::Counter* _peak_memory_usage_counter {}; diff --git a/be/src/vec/sink/vmemory_scratch_sink.cpp b/be/src/vec/sink/vmemory_scratch_sink.cpp index d4f0d4521c0..58c576aa8f6 100644 --- a/be/src/vec/sink/vmemory_scratch_sink.cpp +++ b/be/src/vec/sink/vmemory_scratch_sink.cpp @@ -70,6 +70,7 @@ Status MemoryScratchSink::prepare(RuntimeState* state) { title << "VMemoryScratchSink (frag_id=" << fragment_instance_id << ")"; // create profile _profile = state->obj_pool()->add(new RuntimeProfile(title.str())); + init_sink_common_profile(); return Status::OK(); } diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp index edccadc3202..c6e7e7b87d3 100644 --- a/be/src/vec/sink/vresult_sink.cpp +++ b/be/src/vec/sink/vresult_sink.cpp @@ -83,6 +83,7 @@ Status VResultSink::prepare(RuntimeState* state) { fragment_instance_id.hi, fragment_instance_id.lo); // create profile _profile = state->obj_pool()->add(new RuntimeProfile(title)); + init_sink_common_profile(); // prepare output_expr RETURN_IF_ERROR(prepare_exprs(state)); @@ -133,6 +134,9 @@ Status VResultSink::second_phase_fetch_data(RuntimeState* state, Block* final_bl } Status VResultSink::send(RuntimeState* state, Block* block, bool eos) { + SCOPED_TIMER(_exec_timer); + COUNTER_UPDATE(_blocks_sent_counter, 1); + COUNTER_UPDATE(_output_rows_counter, block->rows()); if (_fetch_option.use_two_phase_fetch && block->rows() > 0) { DCHECK(_sink_type == TResultSinkType::MYSQL_PROTOCAL); RETURN_IF_ERROR(second_phase_fetch_data(state, block)); diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/vtablet_sink_v2.cpp index 9385bd93202..bbba4150298 100644 --- a/be/src/vec/sink/vtablet_sink_v2.cpp +++ b/be/src/vec/sink/vtablet_sink_v2.cpp @@ -54,6 +54,7 @@ Status VOlapTableSinkV2::init(const TDataSink& t_sink) { } Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { + SCOPED_TIMER(_exec_timer); if (_closed) { return _close_status; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/AggregatedProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/AggregatedProfile.java new file mode 100644 index 00000000000..f8481b21e65 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/AggregatedProfile.java @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.profile; + +import org.apache.doris.common.util.RuntimeProfile; + +import java.util.Map; + +/** +* AggregatedProfile is part of a query profile. +* It contains the aggregated information of a query. +*/ +public class AggregatedProfile { + + public static final String PROFILE_NAME = "MergedProfile"; + private ExecutionProfile executionProfile; + + public AggregatedProfile(RuntimeProfile rootProfile, ExecutionProfile executionProfile) { + this.executionProfile = executionProfile; + } + + public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String> planNodeMap) { + return executionProfile.getAggregatedFragmentsProfile(planNodeMap); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java index 70ff5da18f3..d975bd781da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java @@ -18,6 +18,7 @@ package org.apache.doris.common.profile; import org.apache.doris.common.MarkedCountDownLatch; +import org.apache.doris.common.Pair; import org.apache.doris.common.Status; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.RuntimeProfile; @@ -30,7 +31,9 @@ import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -82,6 +85,24 @@ public class ExecutionProfile { this.queryId = queryId; } + public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String> planNodeMap) { + RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments"); + for (int i = 0; i < fragmentProfiles.size(); ++i) { + RuntimeProfile oldFragmentProfile = fragmentProfiles.get(i); + RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " + i); + fragmentsProfile.addChild(newFragmentProfile); + List<RuntimeProfile> allInstanceProfiles = new ArrayList<RuntimeProfile>(); + for (Pair<RuntimeProfile, Boolean> runtimeProfile : oldFragmentProfile.getChildList()) { + allInstanceProfiles.add(runtimeProfile.first); + } + RuntimeProfile mergedInstanceProfile = new RuntimeProfile("Instance" + "(instance_num=" + + allInstanceProfiles.size() + ")", allInstanceProfiles.get(0).nodeId()); + newFragmentProfile.addChild(mergedInstanceProfile); + RuntimeProfile.mergeProfiles(allInstanceProfiles, mergedInstanceProfile, planNodeMap); + } + return fragmentsProfile; + } + public RuntimeProfile getExecutionProfile() { return executionProfile; } @@ -90,6 +111,10 @@ public class ExecutionProfile { return loadChannelProfile; } + public List<RuntimeProfile> getFragmentProfiles() { + return fragmentProfiles; + } + public void addToProfileAsChild(RuntimeProfile rootProfile) { rootProfile.addChild(executionProfile); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java index 2d4424177a3..3b3464d96df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java @@ -21,34 +21,33 @@ import org.apache.doris.common.util.ProfileManager; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.planner.Planner; -import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; -import java.util.List; import java.util.Map; /** - * Profile is a class to record the execution time of a query. - * It has the following structure: - * root profile: - * // summary of this profile, such as start time, end time, query id, etc. - * [SummaryProfile] - * // each execution profile is a complete execution of a query, a job may contain multiple queries. - * [List<ExecutionProfile>] + * Profile is a class to record the execution time of a query. It has the + * following structure: root profile: // summary of this profile, such as start + * time, end time, query id, etc. [SummaryProfile] // each execution profile is + * a complete execution of a query, a job may contain multiple queries. + * [List<ExecutionProfile>] * - * SummaryProfile: - * Summary: - * Execution Summary: + * SummaryProfile: Summary: Execution Summary: * - * ExecutionProfile: - * Fragment 0: - * Fragment 1: - * ... + * + * ExecutionProfile: Fragment 0: Fragment 1: ... */ public class Profile { + private static final Logger LOG = LogManager.getLogger(Profile.class); private RuntimeProfile rootProfile; private SummaryProfile summaryProfile; - private List<ExecutionProfile> executionProfiles = Lists.newArrayList(); + private AggregatedProfile aggregatedProfile; + private ExecutionProfile executionProfile; private boolean isFinished; + private Map<Integer, String> planNodeMap; public Profile(String name, boolean isEnable) { this.rootProfile = new RuntimeProfile(name); @@ -57,29 +56,70 @@ public class Profile { this.isFinished = !isEnable; } - public void addExecutionProfile(ExecutionProfile executionProfile) { - this.executionProfiles.add(executionProfile); - executionProfile.addToProfileAsChild(rootProfile); + public void setExecutionProfile(ExecutionProfile executionProfile) { + if (executionProfile == null) { + LOG.warn("try to set a null excecution profile, it is abnormal", new Exception()); + return; + } + this.executionProfile = executionProfile; + this.executionProfile.addToProfileAsChild(rootProfile); + this.aggregatedProfile = new AggregatedProfile(rootProfile, executionProfile); } public synchronized void update(long startTime, Map<String, String> summaryInfo, boolean isFinished, int profileLevel, Planner planner, boolean isPipelineX) { - if (this.isFinished) { - return; - } - summaryProfile.update(summaryInfo); - for (ExecutionProfile executionProfile : executionProfiles) { + try { + if (this.isFinished) { + return; + } + if (executionProfile == null) { + // Sometimes execution profile is not set + return; + } + summaryProfile.update(summaryInfo); executionProfile.update(startTime, isFinished); + rootProfile.computeTimeInProfile(); + // Nerids native insert not set planner, so it is null + if (planner != null) { + this.planNodeMap = planner.getExplainStringMap(); + } + rootProfile.setIsPipelineX(isPipelineX); + ProfileManager.getInstance().pushProfile(this); + this.isFinished = isFinished; + } catch (Throwable t) { + LOG.warn("update profile failed", t); + throw t; } - rootProfile.computeTimeInProfile(); - rootProfile.setFragmentPlanInfo(planner); - rootProfile.setProfileLevel(profileLevel); - rootProfile.setIsPipelineX(isPipelineX); - ProfileManager.getInstance().pushProfile(rootProfile); - this.isFinished = isFinished; + } + + public RuntimeProfile getRootProfile() { + return this.rootProfile; } public SummaryProfile getSummaryProfile() { return summaryProfile; } + + public String getProfileByLevel() { + StringBuilder builder = new StringBuilder(); + // add summary to builder + summaryProfile.prettyPrint(builder); + LOG.info(builder.toString()); + builder.append("\n MergedProfile \n"); + aggregatedProfile.getAggregatedFragmentsProfile(planNodeMap).prettyPrint(builder, " "); + try { + builder.append("\n"); + executionProfile.getExecutionProfile().prettyPrint(builder, ""); + LOG.info(builder.toString()); + } catch (Throwable aggProfileException) { + LOG.warn("build merged simple profile failed", aggProfileException); + builder.append("build merged simple profile failed"); + } + return builder.toString(); + } + + public String getProfileBrief() { + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + return gson.toJson(rootProfile.toBrief()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java index 856e575e91f..c478ea0fd29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java @@ -33,6 +33,7 @@ import java.util.Map; */ public class SummaryProfile { // Summary + public static final String SUMMARY_PROFILE_NAME = "Summary"; public static final String PROFILE_ID = "Profile ID"; public static final String DORIS_VERSION = "Doris Version"; public static final String TASK_TYPE = "Task Type"; @@ -53,6 +54,7 @@ public class SummaryProfile { public static final String WORKLOAD_GROUP = "Workload Group"; // Execution Summary + public static final String EXECUTION_SUMMARY_PROFILE_NAME = "Execution Summary"; public static final String ANALYSIS_TIME = "Analysis Time"; public static final String JOIN_REORDER_TIME = "JoinReorder Time"; public static final String CREATE_SINGLE_NODE_TIME = "CreateSingleNode Time"; @@ -140,8 +142,8 @@ public class SummaryProfile { private long queryWriteResultConsumeTime = 0; public SummaryProfile(RuntimeProfile rootProfile) { - summaryProfile = new RuntimeProfile("Summary"); - executionSummaryProfile = new RuntimeProfile("Execution Summary"); + summaryProfile = new RuntimeProfile(SUMMARY_PROFILE_NAME); + executionSummaryProfile = new RuntimeProfile(EXECUTION_SUMMARY_PROFILE_NAME); init(); rootProfile.addChild(summaryProfile); rootProfile.addChild(executionSummaryProfile); @@ -156,6 +158,19 @@ public class SummaryProfile { } } + public void prettyPrint(StringBuilder builder) { + summaryProfile.prettyPrint(builder, ""); + executionSummaryProfile.prettyPrint(builder, ""); + } + + public Map<String, String> getAsInfoStings() { + Map<String, String> infoStrings = Maps.newHashMap(); + for (String header : SummaryProfile.SUMMARY_KEYS) { + infoStrings.put(header, summaryProfile.getInfoString(header)); + } + return infoStrings; + } + public void update(Map<String, String> summaryInfo) { updateSummaryProfile(summaryInfo); updateExecutionSummaryProfile(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java index c4d593d8978..72122304703 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java @@ -21,8 +21,8 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.AuthenticationException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; -import org.apache.doris.common.Pair; import org.apache.doris.common.profile.MultiProfileTreeBuilder; +import org.apache.doris.common.profile.Profile; import org.apache.doris.common.profile.ProfileTreeBuilder; import org.apache.doris.common.profile.ProfileTreeNode; import org.apache.doris.common.profile.SummaryProfile; @@ -31,8 +31,6 @@ import org.apache.doris.nereids.stats.StatsErrorEstimator; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -66,13 +64,11 @@ public class ProfileManager { } public static class ProfileElement { - public ProfileElement(RuntimeProfile profile) { + public ProfileElement(Profile profile) { this.profile = profile; } - private final RuntimeProfile profile; - // cache the result of getProfileContent method - private volatile String profileContent = null; + private final Profile profile; public Map<String, String> infoStrings = Maps.newHashMap(); public MultiProfileTreeBuilder builder = null; public String errMsg = ""; @@ -81,22 +77,14 @@ public class ProfileManager { // lazy load profileContent because sometimes profileContent is very large public String getProfileContent() { - - // no need to lock because the possibility of concurrent read is very low - if (profileContent == null) { - // Simple profile will change the structure of the profile. - try { - profileContent = profile.getProfileByLevel(); - } catch (Exception e) { - LOG.warn("profile get error : " + e.toString()); - } - } - return profileContent; + // Not cache the profile content because it may change during insert + // into select statement, we need use this to check process. + // And also, cache the content will double usage of the memory in FE. + return profile.getProfileByLevel(); } public String getProfileBrief() { - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - return gson.toJson(profile.toBrief()); + return profile.getProfileBrief(); } public double getError() { @@ -136,21 +124,10 @@ public class ProfileManager { queryIdToProfileMap = new ConcurrentHashMap<>(); } - public ProfileElement createElement(RuntimeProfile profile) { + public ProfileElement createElement(Profile profile) { ProfileElement element = new ProfileElement(profile); - RuntimeProfile summaryProfile = profile.getChildList().get(0).first; - for (String header : SummaryProfile.SUMMARY_KEYS) { - element.infoStrings.put(header, summaryProfile.getInfoString(header)); - } - List<Pair<RuntimeProfile, Boolean>> childList = summaryProfile.getChildList(); - if (!childList.isEmpty()) { - RuntimeProfile executionProfile = childList.get(0).first; - for (String header : SummaryProfile.EXECUTION_SUMMARY_KEYS) { - element.infoStrings.put(header, executionProfile.getInfoString(header)); - } - } - - MultiProfileTreeBuilder builder = new MultiProfileTreeBuilder(profile); + element.infoStrings.putAll(profile.getSummaryProfile().getAsInfoStings()); + MultiProfileTreeBuilder builder = new MultiProfileTreeBuilder(profile.getRootProfile()); try { builder.build(); } catch (Exception e) { @@ -162,7 +139,7 @@ public class ProfileManager { return element; } - public void pushProfile(RuntimeProfile profile) { + public void pushProfile(Profile profile) { if (profile == null) { return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index 9f1b8234ce9..35ff607acc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -20,7 +20,6 @@ package org.apache.doris.common.util; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.profile.SummaryProfile; -import org.apache.doris.planner.Planner; import org.apache.doris.thrift.TCounter; import org.apache.doris.thrift.TRuntimeProfileNode; import org.apache.doris.thrift.TRuntimeProfileTree; @@ -49,7 +48,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public class RuntimeProfile { private static final Logger LOG = LogManager.getLogger(RuntimeProfile.class); public static String ROOT_COUNTER = ""; - public static int FRAGMENT_DEPTH = 3; public static String MAX_TIME_PRE = "max "; public static String MIN_TIME_PRE = "min "; public static String AVG_TIME_PRE = "avg "; @@ -83,8 +81,6 @@ public class RuntimeProfile { private Boolean isPipelineX = false; private Boolean isSinkOperator = false; - private int profileLevel = 3; - private Map<Integer, String> planNodeMap = null; private int nodeid = -1; public RuntimeProfile(String name) { @@ -449,20 +445,10 @@ public class RuntimeProfile { } } - private static LinkedList<RuntimeProfile> getChildListFromLists(int idx, LinkedList<RuntimeProfile> rhs) { - LinkedList<RuntimeProfile> ret = new LinkedList<RuntimeProfile>(); - for (RuntimeProfile profile : rhs) { - if (idx < profile.childList.size()) { - ret.add(profile.childList.get(idx).first); - } - } - return ret; - } - - private static LinkedList<Counter> getCounterListFromLists(String counterName, LinkedList<RuntimeProfile> rhs) { - LinkedList<Counter> ret = new LinkedList<Counter>(); - for (RuntimeProfile profile : rhs) { - ret.add(profile.counterMap.get(counterName)); + private static List<RuntimeProfile> getChildListFromLists(String profileName, List<RuntimeProfile> profiles) { + List<RuntimeProfile> ret = new ArrayList<RuntimeProfile>(); + for (RuntimeProfile profile : profiles) { + ret.add(profile.getChildMap().get(profileName)); } return ret; } @@ -473,70 +459,45 @@ public class RuntimeProfile { return builder.toString(); } - public String getProfileByLevel() { - if (this.profileLevel == 3) { - return toString(); - } - if (this.planNodeMap == null) { - return toString(); - } - RuntimeProfile simpleProfile = new RuntimeProfile("SimpleProfile"); - getSimpleProfile(0, simpleProfile, this.planNodeMap); - return simpleProfile.toString() + " \n \n " + toString(); - } - - public void getSimpleProfile(int depth, RuntimeProfile simpleProfile, Map<Integer, String> planNodeMap) { - if (depth == FRAGMENT_DEPTH) { - gettSimpleProfileFromMutiInstance(childList, simpleProfile, planNodeMap); - return; - } - for (int i = 0; i < childList.size(); i++) { - Pair<RuntimeProfile, Boolean> pair = childList.get(i); - RuntimeProfile profile = pair.first; - profile.getSimpleProfile(depth + 1, simpleProfile, planNodeMap); - } - } - - public static void gettSimpleProfileFromMutiInstance(LinkedList<Pair<RuntimeProfile, Boolean>> childList, + public static void mergeProfiles(List<RuntimeProfile> profiles, RuntimeProfile simpleProfile, Map<Integer, String> planNodeMap) { - RuntimeProfile oneProfile = childList.get(0).first; - int instanceNum = childList.size(); - RuntimeProfile mergedProfile = new RuntimeProfile("Instance" + "(" + instanceNum + ")", oneProfile.nodeid); - LinkedList<RuntimeProfile> other = new LinkedList<RuntimeProfile>(); - for (int i = 1; i < childList.size(); i++) { - other.add(childList.get(i).first); + mergeCounters(ROOT_COUNTER, profiles, simpleProfile); + if (profiles.size() < 1) { + return; } - simpleProfile.addChildWithCheck(mergedProfile, planNodeMap); - collecteProfile(oneProfile, other, mergedProfile, planNodeMap); - } - - public static void collecteProfile(RuntimeProfile src, LinkedList<RuntimeProfile> others, - RuntimeProfile simpleProfile, Map<Integer, String> planNodeMap) { - collecteProfileCounter(src, ROOT_COUNTER, others, simpleProfile); - for (int i = 0; i < src.childList.size(); i++) { - RuntimeProfile srcChild = src.childList.get(i).first; - LinkedList<RuntimeProfile> rhsChild = getChildListFromLists(i, others); - RuntimeProfile childProfile = new RuntimeProfile(srcChild.name, srcChild.nodeId()); - simpleProfile.addChildWithCheck(childProfile, planNodeMap); - collecteProfile(srcChild, rhsChild, childProfile, planNodeMap); + RuntimeProfile templateProfile = profiles.get(0); + for (int i = 0; i < templateProfile.childList.size(); i++) { + RuntimeProfile templateChildProfile = templateProfile.childList.get(i).first; + List<RuntimeProfile> allChilds = getChildListFromLists(templateChildProfile.name, profiles); + RuntimeProfile newCreatedMergedChildProfile = new RuntimeProfile(templateChildProfile.name, + templateChildProfile.nodeId()); + mergeProfiles(allChilds, newCreatedMergedChildProfile, planNodeMap); + // RuntimeProfile has at least one counter named TotalTime, should exclude it. + if (newCreatedMergedChildProfile.counterMap.size() > 1) { + simpleProfile.addChildWithCheck(newCreatedMergedChildProfile, planNodeMap); + } } } - private static void collecteProfileCounter(RuntimeProfile src, String counterName, LinkedList<RuntimeProfile> rhs, + private static void mergeCounters(String counterName, List<RuntimeProfile> profiles, RuntimeProfile simpleProfile) { - Set<String> childCounterSet = src.childCounterMap.get(counterName); + if (profiles.size() == 0) { + return; + } + RuntimeProfile templateProfile = profiles.get(0); + Set<String> childCounterSet = templateProfile.childCounterMap.get(counterName); if (childCounterSet == null) { return; } - List<String> childCounterList = new LinkedList<>(childCounterSet); - for (String childCounterName : childCounterList) { - Counter counter = src.counterMap.get(childCounterName); - collecteProfileCounter(src, childCounterName, rhs, simpleProfile); + for (String childCounterName : childCounterSet) { + Counter counter = templateProfile.counterMap.get(childCounterName); + mergeCounters(childCounterName, profiles, simpleProfile); if (counter.getLevel() == 1) { - LinkedList<Counter> rhsCounter = getCounterListFromLists(childCounterName, rhs); - // String info = getMergeString(counter, rhsCounter); - AggCounter aggCounter = new AggCounter(counter.getType(), counter.getValue()); - aggCounter.addCounters(rhsCounter); + AggCounter aggCounter = new AggCounter(profiles.get(0).counterMap.get(childCounterName).getType(), 0); + for (RuntimeProfile profile : profiles) { + Counter orgCounter = profile.counterMap.get(childCounterName); + aggCounter.addCounter(orgCounter); + } simpleProfile.addCounter(childCounterName, aggCounter, ROOT_COUNTER); } } @@ -673,7 +634,7 @@ public class RuntimeProfile { childLock.writeLock().unlock(); } // insert plan node info to profile strinfo - if (!planNodeMap.containsKey(child.nodeId())) { + if (planNodeMap == null || !planNodeMap.containsKey(child.nodeId())) { return; } child.addPlanNodeInfos(planNodeMap.get(child.nodeId())); @@ -715,16 +676,6 @@ public class RuntimeProfile { computeTimeInProfile(this.counterTotalTime.getValue()); } - public void setProfileLevel(int profileLevel) { - this.profileLevel = profileLevel; - } - - public void setFragmentPlanInfo(Planner planner) { - if (planner != null) { - this.planNodeMap = planner.getExplainStringMap(); - } - } - private void computeTimeInProfile(long total) { if (total == 0) { return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 3bdbe52209b..d98ba2ba4a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -137,7 +137,7 @@ public class LoadLoadingTask extends LoadTask { Coordinator curCoordinator = new Coordinator(callback.getCallbackId(), loadId, planner.getDescTable(), planner.getFragments(), planner.getScanNodes(), planner.getTimezone(), loadZeroTolerance); if (this.jobProfile != null) { - this.jobProfile.addExecutionProfile(curCoordinator.getExecutionProfile()); + this.jobProfile.setExecutionProfile(curCoordinator.getExecutionProfile()); } curCoordinator.setQueryType(TQueryType.LOAD); curCoordinator.setExecMemoryLimit(execMemLimit); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java index 82031e32ed7..48f80ad907a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java @@ -99,7 +99,7 @@ public class Transaction { try { coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict()); coordinator.setQueryType(TQueryType.LOAD); - executor.getProfile().addExecutionProfile(coordinator.getExecutionProfile()); + executor.getProfile().setExecutionProfile(coordinator.getExecutionProfile()); QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), coordinator); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index ff2964feb88..770c876ed2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -893,10 +893,16 @@ public class StmtExecutor { if (!context.getSessionVariable().enableProfile()) { return; } - - profile.update(context.startTime, getSummaryInfo(isFinished), isFinished, - context.getSessionVariable().profileLevel, this.planner, - context.getSessionVariable().getEnablePipelineXEngine()); + // If any error happends in update profile, we should ignore this error + // and ensure the sql is finished normally. For example, if update profile + // failed, the insert stmt should be success + try { + profile.update(context.startTime, getSummaryInfo(isFinished), isFinished, + context.getSessionVariable().profileLevel, this.planner, + context.getSessionVariable().getEnablePipelineXEngine()); + } catch (Throwable t) { + LOG.warn("failed to update profile, ingore this error", t); + } } // Analyze one statement to structure in memory. @@ -1447,7 +1453,7 @@ public class StmtExecutor { } QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); - profile.addExecutionProfile(coord.getExecutionProfile()); + profile.setExecutionProfile(coord.getExecutionProfile()); coordBase = coord; } @@ -1949,7 +1955,7 @@ public class StmtExecutor { coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); coord.setLoadZeroTolerance(context.getSessionVariable().getEnableInsertStrict()); coord.setQueryType(TQueryType.LOAD); - profile.addExecutionProfile(coord.getExecutionProfile()); + profile.setExecutionProfile(coord.getExecutionProfile()); QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord); @@ -2710,7 +2716,7 @@ public class StmtExecutor { } RowBatch batch; coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); - profile.addExecutionProfile(coord.getExecutionProfile()); + profile.setExecutionProfile(coord.getExecutionProfile()); try { QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org