This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d2400d1d7bf [feature](profile) profilev2 distinguish Sink and Operator 
in pipelineX (#25491)
d2400d1d7bf is described below

commit d2400d1d7bfbc1e211dd784edbcdf306aae84cab
Author: Mryange <59914473+mrya...@users.noreply.github.com>
AuthorDate: Wed Oct 18 13:12:29 2023 +0800

    [feature](profile) profilev2 distinguish Sink and Operator in pipelineX 
(#25491)
    
    * update
    
    * update
---
 be/src/pipeline/exec/join_probe_operator.cpp       |   2 +-
 .../doris/common/util/ProfileStatistics.java       | 122 ++++++++++++++++++---
 .../apache/doris/common/util/RuntimeProfile.java   |   4 +-
 3 files changed, 108 insertions(+), 20 deletions(-)

diff --git a/be/src/pipeline/exec/join_probe_operator.cpp 
b/be/src/pipeline/exec/join_probe_operator.cpp
index 63074bed70c..8ae888f7f06 100644
--- a/be/src/pipeline/exec/join_probe_operator.cpp
+++ b/be/src/pipeline/exec/join_probe_operator.cpp
@@ -41,7 +41,7 @@ Status JoinProbeLocalState<DependencyType, 
Derived>::init(RuntimeState* state,
     _probe_timer = ADD_TIMER(Base::profile(), "ProbeTime");
     _join_filter_timer = ADD_TIMER(Base::profile(), "JoinFilterTimer");
     _build_output_block_timer = ADD_TIMER(Base::profile(), "BuildOutputBlock");
-    _probe_rows_counter = ADD_COUNTER(Base::profile(), "ProbeRows", 
TUnit::UNIT);
+    _probe_rows_counter = ADD_COUNTER_WITH_LEVEL(Base::profile(), "ProbeRows", 
TUnit::UNIT, 1);
 
     return Status::OK();
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileStatistics.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileStatistics.java
index 0d4e8ced886..df4cb0cb015 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileStatistics.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileStatistics.java
@@ -25,13 +25,67 @@ import java.util.HashMap;
  * Used for collecting information obtained from the profile.
  */
 public class ProfileStatistics {
+    class PipelineXStatistics {
+        ArrayList<String> sinkInfos;
+        ArrayList<String> infos;
+
+        int sinkInstance = 0;
+        int operatorInstance = 0;
+
+        PipelineXStatistics() {
+            sinkInfos = new ArrayList<String>();
+            infos = new ArrayList<String>();
+        }
+
+        void add(boolean isSink, String str) {
+            if (isSink) {
+                sinkInfos.add(str);
+            } else {
+                infos.add(str);
+            }
+        }
+
+        void updateInstance(boolean isSink, int instance) {
+            if (isSink) {
+                sinkInstance = Math.max(sinkInstance, instance);
+            } else {
+                operatorInstance = Math.max(operatorInstance, instance);
+            }
+        }
+
+        String to_String() {
+            return null;
+        }
+
+        void getInfo(ArrayList<String> infos, String prefix, StringBuilder 
str) {
+            Collections.sort(infos);
+            for (String info : infos) {
+                str.append(prefix + info + "\n");
+            }
+        }
+
+        void getOperator(String prefix, StringBuilder str) {
+            str.append(prefix + "Instance num : " + operatorInstance + "\n");
+            getInfo(infos, prefix, str);
+        }
+
+        void getSink(String prefix, StringBuilder str) {
+            str.append(prefix + "Instance num : " + sinkInstance + "\n");
+            getInfo(sinkInfos, prefix, str);
+        }
+    }
+
     // Record statistical information based on nodeid.
     private HashMap<Integer, ArrayList<String>> statisticalInfo;
+    private HashMap<Integer, Integer> statisticalInfoInstance;
+
+    // Record statistical information based on nodeid(use in pipelineX).
+    private HashMap<Integer, PipelineXStatistics> pipelineXStatisticalInfo;
 
     // Record statistical information based on fragment ID.
     // "Currently used to record sink nodes.
     private HashMap<Integer, ArrayList<String>> fragmentInfo;
-
+    private HashMap<Integer, Integer> fragmentInfoInstance;
     private int fragmentId;
 
     private boolean isDataSink;
@@ -40,62 +94,96 @@ public class ProfileStatistics {
 
     public ProfileStatistics(boolean isPipelineX) {
         statisticalInfo = new HashMap<Integer, ArrayList<String>>();
+        statisticalInfoInstance = new HashMap<Integer, Integer>();
+
         fragmentInfo = new HashMap<Integer, ArrayList<String>>();
+        pipelineXStatisticalInfo = new HashMap<Integer, PipelineXStatistics>();
+        fragmentInfoInstance = new HashMap<Integer, Integer>();
         fragmentId = 0;
         isDataSink = false;
         this.isPipelineX = isPipelineX;
     }
 
-    private void addPlanNodeInfo(int id, String info) {
+    private void addPipelineXPlanNodeInfo(boolean isSink, int id, String info, 
int instance) {
+        if (!pipelineXStatisticalInfo.containsKey(id)) {
+            pipelineXStatisticalInfo.put(id, new PipelineXStatistics());
+        }
+        pipelineXStatisticalInfo.get(id).add(isSink, info);
+        pipelineXStatisticalInfo.get(id).updateInstance(isSink, instance);
+    }
+
+    private void addPlanNodeInfo(int id, String info, int instance) {
         if (!statisticalInfo.containsKey(id)) {
             statisticalInfo.put(id, new ArrayList<String>());
+            statisticalInfoInstance.put(id, new Integer(0));
         }
         statisticalInfo.get(id).add(info);
+        int ins = statisticalInfoInstance.get(id);
+        ins = Math.max(ins, instance);
+        statisticalInfoInstance.put(id, ins);
     }
 
-    private void addDataSinkInfo(String info) {
+    private void addDataSinkInfo(String info, int instance) {
         if (fragmentInfo.get(fragmentId) == null) {
             fragmentInfo.put(fragmentId, new ArrayList<String>());
+            fragmentInfoInstance.put(fragmentId, new Integer(0));
         }
         fragmentInfo.get(fragmentId).add(info);
+        int ins = fragmentInfoInstance.get(fragmentId);
+        ins = Math.max(ins, instance);
+        fragmentInfoInstance.put(fragmentId, ins);
     }
 
-    public void addInfoFromProfile(RuntimeProfile profile, String name, String 
info) {
+    public void addInfoFromProfile(RuntimeProfile profile, String name, String 
info, int instance) {
         if (isPipelineX) {
-            if (profile.sinkOperator()) {
-                name = name + "(Sink)";
-            } else {
-                name = name + "(Operator)";
-            }
-            addPlanNodeInfo(profile.nodeId(), name + ": " + info);
+            addPipelineXPlanNodeInfo(profile.sinkOperator(), profile.nodeId(), 
name + ": " + info, instance);
         } else {
             if (isDataSink) {
-                addDataSinkInfo(name + ": " + info);
+                addDataSinkInfo(name + ": " + info, instance);
             } else {
-                addPlanNodeInfo(profile.nodeId(), name + ": " + info);
+                addPlanNodeInfo(profile.nodeId(), name + ": " + info, 
instance);
             }
         }
     }
 
     public boolean hasInfo(int id) {
-        return statisticalInfo.containsKey(id);
+        if (isPipelineX) {
+            return pipelineXStatisticalInfo.containsKey(id);
+        } else {
+            return statisticalInfo.containsKey(id);
+        }
     }
 
     public void getInfoById(int id, String prefix, StringBuilder str) {
         if (!hasInfo(id)) {
             return;
         }
-        ArrayList<String> infos = statisticalInfo.get(id);
-        Collections.sort(infos);
-        for (String info : infos) {
-            str.append(prefix + info + "\n");
+        if (isPipelineX) {
+            getPipelineXInfoById(id, prefix, str);
+        } else {
+            ArrayList<String> infos = statisticalInfo.get(id);
+            str.append(prefix + "Instance num :" + 
statisticalInfoInstance.get(id) + "\n");
+            Collections.sort(infos);
+            for (String info : infos) {
+                str.append(prefix + info + "\n");
+            }
         }
     }
 
+    private void getPipelineXInfoById(int id, String prefix, StringBuilder 
str) {
+        PipelineXStatistics statistics = pipelineXStatisticalInfo.get(id);
+        str.append(prefix + "Operator: \n");
+        statistics.getOperator(prefix + "  ", str);
+
+        str.append(prefix + "Sink: \n");
+        statistics.getSink(prefix + "  ", str);
+    }
+
     public void getDataSinkInfo(int fragmentIdx, String prefix, StringBuilder 
str) {
         if (!fragmentInfo.containsKey(fragmentIdx)) {
             return;
         }
+        str.append(prefix + "Instance num :" + 
fragmentInfoInstance.get(fragmentIdx) + "\n");
         ArrayList<String> infos = fragmentInfo.get(fragmentIdx);
         Collections.sort(infos);
         for (String info : infos) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
index 1ba7f66b692..3519ebce254 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
@@ -504,7 +504,7 @@ public class RuntimeProfile {
                 String infoString = AVG_TIME_PRE + 
printCounter(newCounter.getValue(), newCounter.getType()) + ", "
                         + MAX_TIME_PRE + printCounter(maxCounter.getValue(), 
maxCounter.getType()) + ", "
                         + MIN_TIME_PRE + printCounter(minCounter.getValue(), 
minCounter.getType());
-                statistics.addInfoFromProfile(src, counterName, infoString);
+                statistics.addInfoFromProfile(src, counterName, infoString, 
rhsCounter.size() + 1);
             }
         } else {
             Counter newCounter = new Counter(counter.getType(), 
counter.getValue());
@@ -514,7 +514,7 @@ public class RuntimeProfile {
                 }
             }
             String infoString = printCounter(newCounter.getValue(), 
newCounter.getType());
-            statistics.addInfoFromProfile(src, counterName, infoString);
+            statistics.addInfoFromProfile(src, counterName, infoString, 
rhsCounter.size() + 1);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to