This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ffa7c3e32 [multistage][stats] clean up some stats (#10390)
3ffa7c3e32 is described below

commit 3ffa7c3e3237277c6a60fb3379d0a5c44e8240e0
Author: Rong Rong <ro...@apache.org>
AuthorDate: Wed Mar 8 14:49:34 2023 -0800

    [multistage][stats] clean up some stats (#10390)
    
    * adding more stats for wall time compute
    * also enable trace support
    
    ---------
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../apache/pinot/common/datatable/DataTable.java   |  6 ++--
 .../response/broker/BrokerResponseNativeV2.java    |  3 +-
 .../response/broker/BrokerResponseStats.java       | 32 ++++++++++++++---
 .../query/reduce/ExecutionStatsAggregator.java     | 40 +++++++++++++++++++---
 .../apache/pinot/core/util/trace/TraceContext.java |  7 ++--
 .../apache/pinot/query/runtime/QueryRunner.java    |  3 +-
 .../query/runtime/operator/MultiStageOperator.java |  1 -
 .../query/runtime/operator/OperatorStats.java      |  7 ++++
 .../runtime/plan/ServerRequestPlanVisitor.java     |  6 ++--
 9 files changed, 85 insertions(+), 20 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
index ea996cba6d..928834359f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
@@ -132,11 +132,13 @@ public interface DataTable {
     NUM_BLOCKS(28, "numBlocks", MetadataValueType.INT),
     NUM_ROWS(29, "numRows", MetadataValueType.INT),
     OPERATOR_EXECUTION_TIME_MS(30, "operatorExecutionTimeMs", 
MetadataValueType.LONG),
-    OPERATOR_ID(31, "operatorId", MetadataValueType.STRING);
+    OPERATOR_ID(31, "operatorId", MetadataValueType.STRING),
+    OPERATOR_EXEC_START_TIME_MS(32, "operatorExecStartTimeMs", 
MetadataValueType.LONG),
+    OPERATOR_EXEC_END_TIME_MS(33, "operatorExecEndTimeMs", 
MetadataValueType.LONG);
 
     // We keep this constant to track the max id added so far for backward 
compatibility.
     // Increase it when adding new keys, but NEVER DECREASE IT!!!
-    private static final int MAX_ID = 31;
+    private static final int MAX_ID = 33;
 
     private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new 
MetadataKey[MAX_ID + 1];
     private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new 
HashMap<>();
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 5bf631e129..2eacbc23aa 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -81,7 +81,8 @@ public class BrokerResponseNativeV2 extends 
BrokerResponseNative {
   }
 
   public void addStageStat(Integer stageId, BrokerResponseStats 
brokerResponseStats) {
-    if (!brokerResponseStats.getOperatorStats().isEmpty()) {
+    // StageExecutionWallTime will always be there, other stats are optional 
such as OperatorStats
+    if (brokerResponseStats.getStageExecWallTimeMs() != -1) {
       _stageIdStats.put(stageId, brokerResponseStats);
     }
   }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
index 83cbd16f3c..23482e905c 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
@@ -33,11 +33,11 @@ import org.apache.pinot.spi.utils.JsonUtils;
 //  same metadataKey
 // TODO: Replace member fields with a simple map of <MetadataKey, Object>
 // TODO: Add a subStat field, stage level subStats will contain each operator 
stats
-@JsonPropertyOrder({"exceptions", "numBlocks", "numRows", 
"stageExecutionTimeMs", "numServersQueried",
-    "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", 
"numSegmentsMatched",
-    "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", 
"numConsumingSegmentsMatched",
-    "numDocsScanned", "numEntriesScannedInFilter", 
"numEntriesScannedPostFilter", "numGroupsLimitReached",
-    "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", 
"realtimeThreadCpuTimeNs",
+@JsonPropertyOrder({"exceptions", "numBlocks", "numRows", 
"stageExecutionTimeMs", "stageExecutionUnit",
+    "stageExecWallTimeMs", "stageExecEndTimeMs", "numServersQueried", 
"numServersResponded", "numSegmentsQueried",
+    "numSegmentsProcessed", "numSegmentsMatched", 
"numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
+    "numConsumingSegmentsMatched", "numDocsScanned", 
"numEntriesScannedInFilter", "numEntriesScannedPostFilter",
+    "numGroupsLimitReached", "totalDocs", "timeUsedMs", 
"offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
     "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", 
"offlineResponseSerializationCpuTimeNs",
     "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", 
"realtimeTotalCpuTimeNs",
     "traceInfo", "operatorStats", "tableNames"})
@@ -47,6 +47,8 @@ public class BrokerResponseStats extends BrokerResponseNative 
{
   private int _numBlocks = 0;
   private int _numRows = 0;
   private long _stageExecutionTimeMs = 0;
+  private int _stageExecutionUnit = 0;
+  private long _stageExecWallTimeMs = -1;
   private Map<String, Map<String, String>> _operatorStats = new HashMap<>();
   private List<String> _tableNames = new ArrayList<>();
 
@@ -85,6 +87,26 @@ public class BrokerResponseStats extends 
BrokerResponseNative {
     _stageExecutionTimeMs = stageExecutionTimeMs;
   }
 
+  @JsonProperty("stageExecWallTimeMs")
+  public long getStageExecWallTimeMs() {
+    return _stageExecWallTimeMs;
+  }
+
+  @JsonProperty("stageExecWallTimeMs")
+  public void setStageExecWallTimeMs(long stageExecWallTimeMs) {
+    _stageExecWallTimeMs = stageExecWallTimeMs;
+  }
+
+  @JsonProperty("stageExecutionUnit")
+  public long getStageExecutionUnit() {
+    return _stageExecutionUnit;
+  }
+
+  @JsonProperty("stageExecutionUnit")
+  public void setStageExecutionUnit(int stageExecutionUnit) {
+    _stageExecutionUnit = stageExecutionUnit;
+  }
+
   public String toJsonString()
       throws IOException {
     return JsonUtils.objectToString(this);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
index 2faefee584..c8ef48af4a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.reduce;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -77,6 +78,9 @@ public class ExecutionStatsAggregator {
   private int _numBlocks = 0;
   private int _numRows = 0;
   private long _stageExecutionTimeMs = 0;
+  private long _stageExecStartTimeMs = -1;
+  private long _stageExecEndTimeMs = -1;
+  private int _stageExecutionUnit = 0;
 
   public ExecutionStatsAggregator(boolean enableTrace) {
     _enableTrace = enableTrace;
@@ -88,10 +92,6 @@ public class ExecutionStatsAggregator {
 
   public synchronized void aggregate(@Nullable ServerRoutingInstance 
routingInstance, Map<String, String> metadata,
       Map<Integer, String> exceptions) {
-    // Reduce on trace info.
-    if (_enableTrace && 
metadata.containsKey(DataTable.MetadataKey.TRACE_INFO.getName())) {
-      _traceInfo.put(routingInstance.getShortName(), 
metadata.get(DataTable.MetadataKey.TRACE_INFO.getName()));
-    }
 
     String tableNamesStr = metadata.get(DataTable.MetadataKey.TABLE.getName());
     String tableName = null;
@@ -106,12 +106,21 @@ public class ExecutionStatsAggregator {
     }
 
     TableType tableType = null;
+    String instanceName = null;
     if (routingInstance != null) {
       tableType = routingInstance.getTableType();
+      instanceName = routingInstance.getShortName();;
     } else if (tableName != null) {
       tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+      instanceName = tableName;
     } else {
       tableType = null;
+      instanceName = null;
+    }
+
+    // Reduce on trace info.
+    if (_enableTrace && 
metadata.containsKey(DataTable.MetadataKey.TRACE_INFO.getName()) && 
instanceName != null) {
+      _traceInfo.put(instanceName, 
metadata.get(DataTable.MetadataKey.TRACE_INFO.getName()));
     }
 
     String operatorId = 
metadata.get(DataTable.MetadataKey.OPERATOR_ID.getName());
@@ -256,6 +265,21 @@ public class ExecutionStatsAggregator {
     String operatorExecutionTimeString = 
metadata.get(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName());
     if (operatorExecutionTimeString != null) {
       _stageExecutionTimeMs += Long.parseLong(operatorExecutionTimeString);
+      _stageExecutionUnit += 1;
+    }
+
+    String operatorExecStartTimeString = 
metadata.get(DataTable.MetadataKey.OPERATOR_EXEC_START_TIME_MS.getName());
+    if (operatorExecStartTimeString != null) {
+      long operatorExecStartTime = Long.parseLong(operatorExecStartTimeString);
+      _stageExecStartTimeMs = _stageExecStartTimeMs == -1 ? 
operatorExecStartTime
+          : Math.min(operatorExecStartTime, _stageExecStartTimeMs);
+    }
+
+    String operatorExecEndTimeString = 
metadata.get(DataTable.MetadataKey.OPERATOR_EXEC_END_TIME_MS.getName());
+    if (operatorExecEndTimeString != null) {
+      long operatorExecEndTime = Long.parseLong(operatorExecEndTimeString);
+      _stageExecEndTimeMs = _stageExecEndTimeMs == -1 ? operatorExecEndTime
+          : Math.max(operatorExecEndTime, _stageExecEndTimeMs);
     }
   }
 
@@ -344,7 +368,13 @@ public class ExecutionStatsAggregator {
     brokerResponseStats.setNumBlocks(_numBlocks);
     brokerResponseStats.setNumRows(_numRows);
     brokerResponseStats.setStageExecutionTimeMs(_stageExecutionTimeMs);
-    brokerResponseStats.setOperatorStats(_operatorStats);
+    brokerResponseStats.setStageExecWallTimeMs(_stageExecEndTimeMs - 
_stageExecStartTimeMs);
+    brokerResponseStats.setStageExecutionUnit(_stageExecutionUnit);
+    if (_enableTrace) {
+      brokerResponseStats.setOperatorStats(_operatorStats);
+    } else {
+      brokerResponseStats.setOperatorStats(Collections.emptyMap());
+    }
     brokerResponseStats.setTableNames(new ArrayList<>(_tableNames));
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java
index 4d430434e5..0bdaf0da62 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java
@@ -189,8 +189,11 @@ public final class TraceContext {
    */
   public static String getTraceInfo() {
     ArrayNode jsonTraces = JsonUtils.newArrayNode();
-    for (Trace trace : 
REQUEST_TO_TRACES_MAP.get(TRACE_ENTRY_THREAD_LOCAL.get()._requestId)) {
-      jsonTraces.add(trace.toJson());
+    Queue<Trace> traces = 
REQUEST_TO_TRACES_MAP.get(TRACE_ENTRY_THREAD_LOCAL.get()._requestId);
+    if (traces != null && !traces.isEmpty()) {
+      for (Trace trace : traces) {
+        jsonTraces.add(trace.toJson());
+      }
     }
     return jsonTraces.toString();
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 4c7380943e..a1d22c678f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -184,8 +184,7 @@ public class QueryRunner {
       MailboxSendOperator mailboxSendOperator = new 
MailboxSendOperator(_mailboxService,
           new LeafStageTransferableBlockOperator(serverQueryResults, 
sendNode.getDataSchema(), requestId,
               sendNode.getStageId(), _rootServer), 
receivingStageMetadata.getServerInstances(),
-          sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), 
_rootServer,
-          serverQueryRequests.get(0).getRequestId(),
+          sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), 
_rootServer, requestId,
           sendNode.getStageId(), sendNode.getReceiverStageId());
       int blockCounter = 0;
       while 
(!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index a6bc863457..55fbc8830d 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -73,7 +73,6 @@ public abstract class MultiStageOperator implements 
Operator<TransferableBlock>,
           for (MultiStageOperator op : getChildOperators()) {
             _operatorStatsMap.putAll(op.getOperatorStatsMap());
           }
-
           if (!_operatorStats.getExecutionStats().isEmpty()) {
             
_operatorStats.recordSingleStat(DataTable.MetadataKey.OPERATOR_ID.getName(), 
_operatorId);
             _operatorStatsMap.put(_operatorId, _operatorStats);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
index 74cefbbc90..715be163bb 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
@@ -39,6 +39,7 @@ public class OperatorStats {
 
   private int _numBlock = 0;
   private int _numRows = 0;
+  private long _startTimeMs = -1;
   private final Map<String, String> _executionStats;
 
   public OperatorStats(long requestId, int stageId, VirtualServerAddress 
serverAddress, String operatorType) {
@@ -50,6 +51,7 @@ public class OperatorStats {
   }
 
   public void startTimer() {
+    _startTimeMs = _startTimeMs == -1 ? System.currentTimeMillis() : 
_startTimeMs;
     if (!_executeStopwatch.isRunning()) {
       _executeStopwatch.start();
     }
@@ -79,6 +81,11 @@ public class OperatorStats {
     _executionStats.putIfAbsent(DataTable.MetadataKey.NUM_ROWS.getName(), 
String.valueOf(_numRows));
     
_executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName(),
         String.valueOf(_executeStopwatch.elapsed(TimeUnit.MILLISECONDS)));
+    // wall time are recorded slightly longer than actual execution but it is 
OK.
+    
_executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXEC_START_TIME_MS.getName(),
+        String.valueOf(_startTimeMs));
+    
_executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXEC_END_TIME_MS.getName(),
+        String.valueOf(System.currentTimeMillis()));
     return _executionStats;
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index 97924586b1..55141cc739 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -93,7 +93,9 @@ public class ServerRequestPlanVisitor implements 
StageNodeVisitor<Void, ServerPl
       DistributedStagePlan stagePlan, Map<String, String> requestMetadataMap, 
TableConfig tableConfig, Schema schema,
       TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String> 
segmentList) {
     // Before-visit: construct the ServerPlanRequestContext baseline
-    long requestId = 
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
+    // Making a unique requestId for leaf stages otherwise it causes problem 
on stats/metrics/tracing.
+    long requestId = 
(Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)) 
<< 16)
+        + (stagePlan.getStageId() << 8) + (tableType == TableType.REALTIME ? 1 
: 0);
     long timeoutMs = 
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
     PinotQuery pinotQuery = new PinotQuery();
     Integer leafNodeLimit = 
QueryOptionsUtils.getMultiStageLeafLimit(requestMetadataMap);
@@ -140,7 +142,7 @@ public class ServerRequestPlanVisitor implements 
StageNodeVisitor<Void, ServerPl
     InstanceRequest instanceRequest = new InstanceRequest();
     instanceRequest.setRequestId(requestId);
     instanceRequest.setBrokerId("unknown");
-    instanceRequest.setEnableTrace(false);
+    
instanceRequest.setEnableTrace(Boolean.parseBoolean(requestMetadataMap.get(CommonConstants.Broker.Request.TRACE)));
     instanceRequest.setSearchSegments(segmentList);
     instanceRequest.setQuery(brokerRequest);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to