This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d2400d1d7bf [feature](profile) profilev2 distinguish Sink and Operator in pipelineX (#25491) d2400d1d7bf is described below commit d2400d1d7bfbc1e211dd784edbcdf306aae84cab Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Wed Oct 18 13:12:29 2023 +0800 [feature](profile) profilev2 distinguish Sink and Operator in pipelineX (#25491) * update * update --- be/src/pipeline/exec/join_probe_operator.cpp | 2 +- .../doris/common/util/ProfileStatistics.java | 122 ++++++++++++++++++--- .../apache/doris/common/util/RuntimeProfile.java | 4 +- 3 files changed, 108 insertions(+), 20 deletions(-) diff --git a/be/src/pipeline/exec/join_probe_operator.cpp b/be/src/pipeline/exec/join_probe_operator.cpp index 63074bed70c..8ae888f7f06 100644 --- a/be/src/pipeline/exec/join_probe_operator.cpp +++ b/be/src/pipeline/exec/join_probe_operator.cpp @@ -41,7 +41,7 @@ Status JoinProbeLocalState<DependencyType, Derived>::init(RuntimeState* state, _probe_timer = ADD_TIMER(Base::profile(), "ProbeTime"); _join_filter_timer = ADD_TIMER(Base::profile(), "JoinFilterTimer"); _build_output_block_timer = ADD_TIMER(Base::profile(), "BuildOutputBlock"); - _probe_rows_counter = ADD_COUNTER(Base::profile(), "ProbeRows", TUnit::UNIT); + _probe_rows_counter = ADD_COUNTER_WITH_LEVEL(Base::profile(), "ProbeRows", TUnit::UNIT, 1); return Status::OK(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileStatistics.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileStatistics.java index 0d4e8ced886..df4cb0cb015 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileStatistics.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileStatistics.java @@ -25,13 +25,67 @@ import java.util.HashMap; * Used for collecting information obtained from the profile. */ public class ProfileStatistics { + class PipelineXStatistics { + ArrayList<String> sinkInfos; + ArrayList<String> infos; + + int sinkInstance = 0; + int operatorInstance = 0; + + PipelineXStatistics() { + sinkInfos = new ArrayList<String>(); + infos = new ArrayList<String>(); + } + + void add(boolean isSink, String str) { + if (isSink) { + sinkInfos.add(str); + } else { + infos.add(str); + } + } + + void updateInstance(boolean isSink, int instance) { + if (isSink) { + sinkInstance = Math.max(sinkInstance, instance); + } else { + operatorInstance = Math.max(operatorInstance, instance); + } + } + + String to_String() { + return null; + } + + void getInfo(ArrayList<String> infos, String prefix, StringBuilder str) { + Collections.sort(infos); + for (String info : infos) { + str.append(prefix + info + "\n"); + } + } + + void getOperator(String prefix, StringBuilder str) { + str.append(prefix + "Instance num : " + operatorInstance + "\n"); + getInfo(infos, prefix, str); + } + + void getSink(String prefix, StringBuilder str) { + str.append(prefix + "Instance num : " + sinkInstance + "\n"); + getInfo(sinkInfos, prefix, str); + } + } + // Record statistical information based on nodeid. private HashMap<Integer, ArrayList<String>> statisticalInfo; + private HashMap<Integer, Integer> statisticalInfoInstance; + + // Record statistical information based on nodeid(use in pipelineX). + private HashMap<Integer, PipelineXStatistics> pipelineXStatisticalInfo; // Record statistical information based on fragment ID. // "Currently used to record sink nodes. private HashMap<Integer, ArrayList<String>> fragmentInfo; - + private HashMap<Integer, Integer> fragmentInfoInstance; private int fragmentId; private boolean isDataSink; @@ -40,62 +94,96 @@ public class ProfileStatistics { public ProfileStatistics(boolean isPipelineX) { statisticalInfo = new HashMap<Integer, ArrayList<String>>(); + statisticalInfoInstance = new HashMap<Integer, Integer>(); + fragmentInfo = new HashMap<Integer, ArrayList<String>>(); + pipelineXStatisticalInfo = new HashMap<Integer, PipelineXStatistics>(); + fragmentInfoInstance = new HashMap<Integer, Integer>(); fragmentId = 0; isDataSink = false; this.isPipelineX = isPipelineX; } - private void addPlanNodeInfo(int id, String info) { + private void addPipelineXPlanNodeInfo(boolean isSink, int id, String info, int instance) { + if (!pipelineXStatisticalInfo.containsKey(id)) { + pipelineXStatisticalInfo.put(id, new PipelineXStatistics()); + } + pipelineXStatisticalInfo.get(id).add(isSink, info); + pipelineXStatisticalInfo.get(id).updateInstance(isSink, instance); + } + + private void addPlanNodeInfo(int id, String info, int instance) { if (!statisticalInfo.containsKey(id)) { statisticalInfo.put(id, new ArrayList<String>()); + statisticalInfoInstance.put(id, new Integer(0)); } statisticalInfo.get(id).add(info); + int ins = statisticalInfoInstance.get(id); + ins = Math.max(ins, instance); + statisticalInfoInstance.put(id, ins); } - private void addDataSinkInfo(String info) { + private void addDataSinkInfo(String info, int instance) { if (fragmentInfo.get(fragmentId) == null) { fragmentInfo.put(fragmentId, new ArrayList<String>()); + fragmentInfoInstance.put(fragmentId, new Integer(0)); } fragmentInfo.get(fragmentId).add(info); + int ins = fragmentInfoInstance.get(fragmentId); + ins = Math.max(ins, instance); + fragmentInfoInstance.put(fragmentId, ins); } - public void addInfoFromProfile(RuntimeProfile profile, String name, String info) { + public void addInfoFromProfile(RuntimeProfile profile, String name, String info, int instance) { if (isPipelineX) { - if (profile.sinkOperator()) { - name = name + "(Sink)"; - } else { - name = name + "(Operator)"; - } - addPlanNodeInfo(profile.nodeId(), name + ": " + info); + addPipelineXPlanNodeInfo(profile.sinkOperator(), profile.nodeId(), name + ": " + info, instance); } else { if (isDataSink) { - addDataSinkInfo(name + ": " + info); + addDataSinkInfo(name + ": " + info, instance); } else { - addPlanNodeInfo(profile.nodeId(), name + ": " + info); + addPlanNodeInfo(profile.nodeId(), name + ": " + info, instance); } } } public boolean hasInfo(int id) { - return statisticalInfo.containsKey(id); + if (isPipelineX) { + return pipelineXStatisticalInfo.containsKey(id); + } else { + return statisticalInfo.containsKey(id); + } } public void getInfoById(int id, String prefix, StringBuilder str) { if (!hasInfo(id)) { return; } - ArrayList<String> infos = statisticalInfo.get(id); - Collections.sort(infos); - for (String info : infos) { - str.append(prefix + info + "\n"); + if (isPipelineX) { + getPipelineXInfoById(id, prefix, str); + } else { + ArrayList<String> infos = statisticalInfo.get(id); + str.append(prefix + "Instance num :" + statisticalInfoInstance.get(id) + "\n"); + Collections.sort(infos); + for (String info : infos) { + str.append(prefix + info + "\n"); + } } } + private void getPipelineXInfoById(int id, String prefix, StringBuilder str) { + PipelineXStatistics statistics = pipelineXStatisticalInfo.get(id); + str.append(prefix + "Operator: \n"); + statistics.getOperator(prefix + " ", str); + + str.append(prefix + "Sink: \n"); + statistics.getSink(prefix + " ", str); + } + public void getDataSinkInfo(int fragmentIdx, String prefix, StringBuilder str) { if (!fragmentInfo.containsKey(fragmentIdx)) { return; } + str.append(prefix + "Instance num :" + fragmentInfoInstance.get(fragmentIdx) + "\n"); ArrayList<String> infos = fragmentInfo.get(fragmentIdx); Collections.sort(infos); for (String info : infos) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index 1ba7f66b692..3519ebce254 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -504,7 +504,7 @@ public class RuntimeProfile { String infoString = AVG_TIME_PRE + printCounter(newCounter.getValue(), newCounter.getType()) + ", " + MAX_TIME_PRE + printCounter(maxCounter.getValue(), maxCounter.getType()) + ", " + MIN_TIME_PRE + printCounter(minCounter.getValue(), minCounter.getType()); - statistics.addInfoFromProfile(src, counterName, infoString); + statistics.addInfoFromProfile(src, counterName, infoString, rhsCounter.size() + 1); } } else { Counter newCounter = new Counter(counter.getType(), counter.getValue()); @@ -514,7 +514,7 @@ public class RuntimeProfile { } } String infoString = printCounter(newCounter.getValue(), newCounter.getType()); - statistics.addInfoFromProfile(src, counterName, infoString); + statistics.addInfoFromProfile(src, counterName, infoString, rhsCounter.size() + 1); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org