This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c9b51b8dc Better mse stats (#15901)
2c9b51b8dc is described below

commit 2c9b51b8dcd76e378e133b8f855cedf42e3983ac
Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com>
AuthorDate: Tue May 27 13:07:24 2025 +0200

    Better mse stats (#15901)
---
 .../components/Query/VisualizeQueryStageStats.tsx  | 81 ++++++++++++++----
 .../query/runtime/InStageStatsTreeBuilder.java     | 95 +++++++++++++---------
 .../query/runtime/MultiStageStatsTreeBuilder.java  |  2 +-
 3 files changed, 123 insertions(+), 55 deletions(-)

diff --git 
a/pinot-controller/src/main/resources/app/components/Query/VisualizeQueryStageStats.tsx
 
b/pinot-controller/src/main/resources/app/components/Query/VisualizeQueryStageStats.tsx
index cfd39ca377..10b6939aea 100644
--- 
a/pinot-controller/src/main/resources/app/components/Query/VisualizeQueryStageStats.tsx
+++ 
b/pinot-controller/src/main/resources/app/components/Query/VisualizeQueryStageStats.tsx
@@ -16,8 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-import React from "react";
-import ReactFlow, { Background, Controls, MiniMap, Handle, Node, Edge } from 
"react-flow-renderer";
+import React, {useMemo, useState} from "react";
+import ReactFlow, {
+  Background,
+  Controls,
+  MiniMap,
+  Handle,
+  Node,
+  Edge,
+  ControlButton
+} from "react-flow-renderer";
 import dagre from "dagre";
 import { Typography, useTheme } from "@material-ui/core";
 import "react-flow-renderer/dist/style.css";
@@ -27,7 +35,8 @@ import isEmpty from "lodash/isEmpty";
  * Main component to visualize query stage stats as a flowchart.
  */
 export const VisualizeQueryStageStats = ({ stageStats }) => {
-  const { nodes, edges } = generateFlowElements(stageStats); // Generate nodes 
and edges from input data
+  const [simpleMode, setSimpleMode] = useState(true);
+  const { nodes, edges } = useMemo(() => generateFlowElements(stageStats, 
simpleMode), [stageStats, simpleMode]); // Generate nodes and edges from input 
data
   
   if(isEmpty(stageStats)) {
     return (
@@ -38,17 +47,22 @@ export const VisualizeQueryStageStats = ({ stageStats }) => 
{
   }
 
   return (
-    <div style={{ height: 500 }}>
+    <div style={{ height: 1000 }}>
       <ReactFlow
         nodes={nodes}
         edges={edges}
         fitView
         nodeTypes={nodeTypes} // Use custom node types
-        zoomOnScroll={false}
+        zoomOnScroll={true}
       >
         <Background />
-        <Controls showInteractive={false} />
-        <MiniMap />
+        <Controls showInteractive={false}>
+          <ControlButton onClick={() => setSimpleMode(!simpleMode)}>
+            {/* TODO: Look for an icon for this */}
+            {simpleMode ? "Show Details" : "Hide Details"}
+          </ControlButton>
+        </Controls>
+        <MiniMap/>
       </ReactFlow>
     </div>
   );
@@ -124,24 +138,54 @@ const layoutNodesAndEdges = (nodes, edges, direction = 
"TB") => {
   };
 };
 
+/**
+ * Simplifies the data structure for a node to display only the most useful 
information.
+ * @param data
+ */
+const calculateSimpleData = (data) => {
+  const simpleData = {
+    type: data.type,
+    clockTimeMs: data.clockTimeMs,
+    emittedRows: data.emittedRows,
+  }
+  if (data.stage) {
+    simpleData["stage"] = data.stage;
+  }
+  if (data.parallelism) {
+    simpleData["parallelism"] = data.parallelism;
+  }
+  if (data.table) {
+    simpleData["table"] = data.table;
+  }
+  if (data.numEntriesScannedPostFilter) {
+    simpleData["numEntriesScannedPostFilter"] = 
data.numEntriesScannedPostFilter;
+  }
+  if (data.numEntriesScannedInFilter) {
+    simpleData["numEntriesScannedInFilter"] = data.numEntriesScannedInFilter;
+  }
+  return simpleData;
+}
+
 /**
  * Recursively generates nodes and edges for the flowchart from a hierarchical 
data structure.
  */
-const generateFlowElements = (stats) => {
+const generateFlowElements = (stats, simpleMode) => {
   const stageRoots: Map<Number, Node> = new Map();
   const nodes: Node[] = [];
   const edges: Edge[] = [];
 
-  const createFlowNode = (data, id, parentId) => {
-    const { width, height } = calculateNodeDimensions(data);
+  const createFlowNode = (data, id, parentId, clockTime, strokeWidth) => {
+    const actualNodeData = simpleMode ? calculateSimpleData(data) : data;
+
+    const { width, height } = calculateNodeDimensions(actualNodeData);
 
     // Add the node
-    const flowNode: Node = { id, type: "customNode", data, position: { x: 0, 
y: 0 }, width, height };
+    const flowNode: Node = { id, type: "customNode", data: actualNodeData, 
position: { x: 0, y: 0 }, width, height };
     nodes.push(flowNode);
 
     // Add an edge if this node has a parent
     if (parentId) {
-      edges.push({ id: `edge-${id}`, source: parentId, target: id });
+      edges.push({ id: `edge-${id}`, source: parentId, target: id, style: { 
strokeWidth }});
     }
 
     return flowNode;
@@ -152,9 +196,12 @@ const generateFlowElements = (stats) => {
    *
    * Nodes that have been already added to the graph are not added again.
    */
-  const traverseTree = (node, id, parentId) => {
+  const traverseTree = (node, id, parentId, totalTime) => {
     const { children, ...data } = node;
 
+    const clockTime = data["clockTimeMs"] || 0;
+    const strokeWidth: number = Math.max(1, Math.min(50, clockTime / totalTime 
* 50));
+
     const stageId = data["stage"];
     if (stageId) {
       const oldFlowNode = stageRoots.get(stageId);
@@ -162,21 +209,21 @@ const generateFlowElements = (stats) => {
         // Add an edge if this node has a parent
         if (parentId) {
           const id = oldFlowNode.id;
-          edges.push({ id: `edge-${parentId}-${id}`, source: parentId, target: 
id });
+          edges.push({ id: `edge-${parentId}-${id}`, source: parentId, target: 
id, style: { strokeWidth }, label: clockTime + "ms" });
           return;
         }
       }
     }
 
-    const newFlowNode = createFlowNode(data, id, parentId);
+    const newFlowNode = createFlowNode(data, id, parentId, clockTime, 
strokeWidth);
     if (stageId) {
       stageRoots.set(stageId, newFlowNode);
     }
     // Recursively process children
-    children?.forEach((child, idx) => traverseTree(child, `${id}.${idx+1}`, 
newFlowNode.id));
+    children?.forEach((child, idx) => traverseTree(child, `${id}.${idx+1}`, 
newFlowNode.id, totalTime));
   };
 
-  traverseTree(stats, "0", null); // Start traversal from the root node
+  traverseTree(stats, "0", null, stats["executionTimeMs"]); // Start traversal 
from the root node
 
   return layoutNodesAndEdges(nodes, edges);
 };
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
index 048af39253..e214030805 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
@@ -21,10 +21,10 @@ package org.apache.pinot.query.runtime;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.function.IntFunction;
+import org.apache.pinot.common.datatable.StatMap;
 import org.apache.pinot.query.planner.plannode.AggregateNode;
 import org.apache.pinot.query.planner.plannode.BasePlanNode;
 import org.apache.pinot.query.planner.plannode.ExchangeNode;
@@ -41,6 +41,7 @@ import org.apache.pinot.query.planner.plannode.SortNode;
 import org.apache.pinot.query.planner.plannode.TableScanNode;
 import org.apache.pinot.query.planner.plannode.ValueNode;
 import org.apache.pinot.query.planner.plannode.WindowNode;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
 import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -48,7 +49,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class InStageStatsTreeBuilder implements PlanNodeVisitor<ObjectNode, 
Void> {
+public class InStageStatsTreeBuilder implements PlanNodeVisitor<ObjectNode, 
InStageStatsTreeBuilder.Context> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(InStageStatsTreeBuilder.class);
 
   private final MultiStageQueryStats.StageStats _stageStats;
@@ -62,18 +63,25 @@ public class InStageStatsTreeBuilder implements 
PlanNodeVisitor<ObjectNode, Void
     _jsonStatsByStage = jsonStatsByStage;
   }
 
-  private ObjectNode selfNode(MultiStageOperator.Type type) {
+  private ObjectNode selfNode(MultiStageOperator.Type type, Context context) {
     ObjectNode json = JsonUtils.newObjectNode();
     json.put("type", type.toString());
-    Iterator<Map.Entry<String, JsonNode>> statsIt = 
_stageStats.getOperatorStats(_index).asJson().fields();
-    while (statsIt.hasNext()) {
-      Map.Entry<String, JsonNode> entry = statsIt.next();
+    for (Map.Entry<String, JsonNode> entry : 
_stageStats.getOperatorStats(_index).asJson().properties()) {
       json.set(entry.getKey(), entry.getValue());
     }
+
+    if (json.get("parallelism") == null) {
+      json.put("parallelism", context._parallelism);
+    }
+
+    JsonNode executionTimeMs = json.get("executionTimeMs");
+    long cpuTimeMs = executionTimeMs == null ? 0 : executionTimeMs.asLong(0);
+    json.put("clockTimeMs", cpuTimeMs / context._parallelism);
+
     return json;
   }
 
-  private ObjectNode recursiveCase(BasePlanNode node, MultiStageOperator.Type 
expectedType) {
+  private ObjectNode recursiveCase(BasePlanNode node, MultiStageOperator.Type 
expectedType, Context context) {
     MultiStageOperator.Type type = _stageStats.getOperatorType(_index);
     /*
      Sometimes the operator type is not what we expect, but we can still build 
the tree
@@ -84,7 +92,7 @@ public class InStageStatsTreeBuilder implements 
PlanNodeVisitor<ObjectNode, Void
     */
     if (type != expectedType) {
       if (type == MultiStageOperator.Type.LEAF) {
-        return selfNode(MultiStageOperator.Type.LEAF);
+        return selfNode(MultiStageOperator.Type.LEAF, context);
       }
       List<PlanNode> inputs = node.getInputs();
       int childrenSize = inputs.size();
@@ -92,21 +100,21 @@ public class InStageStatsTreeBuilder implements 
PlanNodeVisitor<ObjectNode, Void
         case 0:
           return JsonUtils.newObjectNode();
         case 1:
-          return inputs.get(0).visit(this, null);
+          return inputs.get(0).visit(this, context);
         default:
           ObjectNode json = JsonUtils.newObjectNode();
           ArrayNode children = JsonUtils.newArrayNode();
           for (int i = 0; i < childrenSize; i++) {
             _index--;
             if (inputs.size() > i) {
-              children.add(inputs.get(i).visit(this, null));
+              children.add(inputs.get(i).visit(this, context));
             }
           }
           json.set(CHILDREN_KEY, children);
           return json;
       }
     }
-    ObjectNode json = selfNode(type);
+    ObjectNode json = selfNode(type, context);
     List<PlanNode> inputs = node.getInputs();
     int size = inputs.size();
     JsonNode[] childrenArr = new JsonNode[size];
@@ -118,7 +126,7 @@ public class InStageStatsTreeBuilder implements 
PlanNodeVisitor<ObjectNode, Void
     for (int i = size - 1; i >= 0; i--) {
       PlanNode planNode = inputs.get(i);
       _index--;
-      JsonNode child = planNode.visit(this, null);
+      JsonNode child = planNode.visit(this, context);
 
       childrenArr[i] = child;
     }
@@ -127,28 +135,28 @@ public class InStageStatsTreeBuilder implements 
PlanNodeVisitor<ObjectNode, Void
   }
 
   @Override
-  public ObjectNode visitAggregate(AggregateNode node, Void context) {
-    return recursiveCase(node, MultiStageOperator.Type.AGGREGATE);
+  public ObjectNode visitAggregate(AggregateNode node, Context context) {
+    return recursiveCase(node, MultiStageOperator.Type.AGGREGATE, context);
   }
 
   @Override
-  public ObjectNode visitFilter(FilterNode node, Void context) {
-    return recursiveCase(node, MultiStageOperator.Type.FILTER);
+  public ObjectNode visitFilter(FilterNode node, Context context) {
+    return recursiveCase(node, MultiStageOperator.Type.FILTER, context);
   }
 
   @Override
-  public ObjectNode visitJoin(JoinNode node, Void context) {
+  public ObjectNode visitJoin(JoinNode node, Context context) {
     if (node.getJoinStrategy() == JoinNode.JoinStrategy.HASH) {
-      return recursiveCase(node, MultiStageOperator.Type.HASH_JOIN);
+      return recursiveCase(node, MultiStageOperator.Type.HASH_JOIN, context);
     } else {
       assert node.getJoinStrategy() == JoinNode.JoinStrategy.LOOKUP;
-      return recursiveCase(node, MultiStageOperator.Type.LOOKUP_JOIN);
+      return recursiveCase(node, MultiStageOperator.Type.LOOKUP_JOIN, context);
     }
   }
 
   @Override
-  public ObjectNode visitMailboxReceive(MailboxReceiveNode node, Void context) 
{
-    ObjectNode json = selfNode(MultiStageOperator.Type.MAILBOX_RECEIVE);
+  public ObjectNode visitMailboxReceive(MailboxReceiveNode node, Context 
context) {
+    ObjectNode json = selfNode(MultiStageOperator.Type.MAILBOX_RECEIVE, 
context);
 
     ArrayNode children = JsonUtils.newArrayNode();
     int senderStageId = node.getSenderStageId();
@@ -158,37 +166,42 @@ public class InStageStatsTreeBuilder implements 
PlanNodeVisitor<ObjectNode, Void
   }
 
   @Override
-  public ObjectNode visitMailboxSend(MailboxSendNode node, Void context) {
-    return recursiveCase(node, MultiStageOperator.Type.MAILBOX_SEND);
+  public ObjectNode visitMailboxSend(MailboxSendNode node, Context context) {
+    @SuppressWarnings("unchecked")
+    StatMap<MailboxSendOperator.StatKey> operatorStats =
+        (StatMap<MailboxSendOperator.StatKey>) 
_stageStats.getOperatorStats(_index);
+    long parallelism = 
operatorStats.getLong(MailboxSendOperator.StatKey.PARALLELISM);
+    Context myContext = new Context((int) parallelism);
+    return recursiveCase(node, MultiStageOperator.Type.MAILBOX_SEND, 
myContext);
   }
 
   @Override
-  public ObjectNode visitProject(ProjectNode node, Void context) {
-    return recursiveCase(node, MultiStageOperator.Type.TRANSFORM);
+  public ObjectNode visitProject(ProjectNode node, Context context) {
+    return recursiveCase(node, MultiStageOperator.Type.TRANSFORM, context);
   }
 
   @Override
-  public ObjectNode visitSort(SortNode node, Void context) {
-    return recursiveCase(node, MultiStageOperator.Type.SORT_OR_LIMIT);
+  public ObjectNode visitSort(SortNode node, Context context) {
+    return recursiveCase(node, MultiStageOperator.Type.SORT_OR_LIMIT, context);
   }
 
   @Override
-  public ObjectNode visitTableScan(TableScanNode node, Void context) {
-    return recursiveCase(node, MultiStageOperator.Type.LEAF);
+  public ObjectNode visitTableScan(TableScanNode node, Context context) {
+    return recursiveCase(node, MultiStageOperator.Type.LEAF, context);
   }
 
   @Override
-  public ObjectNode visitValue(ValueNode node, Void context) {
-    return recursiveCase(node, MultiStageOperator.Type.LITERAL);
+  public ObjectNode visitValue(ValueNode node, Context context) {
+    return recursiveCase(node, MultiStageOperator.Type.LITERAL, context);
   }
 
   @Override
-  public ObjectNode visitWindow(WindowNode node, Void context) {
-    return recursiveCase(node, MultiStageOperator.Type.WINDOW);
+  public ObjectNode visitWindow(WindowNode node, Context context) {
+    return recursiveCase(node, MultiStageOperator.Type.WINDOW, context);
   }
 
   @Override
-  public ObjectNode visitSetOp(SetOpNode node, Void context) {
+  public ObjectNode visitSetOp(SetOpNode node, Context context) {
     MultiStageOperator.Type type;
     switch (node.getSetOpType()) {
       case UNION:
@@ -203,16 +216,24 @@ public class InStageStatsTreeBuilder implements 
PlanNodeVisitor<ObjectNode, Void
       default:
         throw new IllegalStateException("Unexpected set op type: " + 
node.getSetOpType());
     }
-    return recursiveCase(node, type);
+    return recursiveCase(node, type, context);
   }
 
   @Override
-  public ObjectNode visitExchange(ExchangeNode node, Void context) {
+  public ObjectNode visitExchange(ExchangeNode node, Context context) {
     throw new UnsupportedOperationException("ExchangeNode should not be 
visited");
   }
 
   @Override
-  public ObjectNode visitExplained(ExplainedNode node, Void context) {
+  public ObjectNode visitExplained(ExplainedNode node, Context context) {
     throw new UnsupportedOperationException("ExplainedNode should not be 
visited");
   }
+
+  public static class Context {
+    private final int _parallelism;
+
+    public Context(int parallelism) {
+      _parallelism = parallelism;
+    }
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/MultiStageStatsTreeBuilder.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/MultiStageStatsTreeBuilder.java
index 927f107c53..98016e9166 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/MultiStageStatsTreeBuilder.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/MultiStageStatsTreeBuilder.java
@@ -61,6 +61,6 @@ public class MultiStageStatsTreeBuilder {
       return jsonNodes;
     }
     InStageStatsTreeBuilder treeBuilder = new 
InStageStatsTreeBuilder(stageStats, this::jsonStatsByStage);
-    return planNode.visit(treeBuilder, null);
+    return planNode.visit(treeBuilder, new InStageStatsTreeBuilder.Context(1));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to