This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3ffa7c3e32 [multistage][stats] clean up some stats (#10390) 3ffa7c3e32 is described below commit 3ffa7c3e3237277c6a60fb3379d0a5c44e8240e0 Author: Rong Rong <ro...@apache.org> AuthorDate: Wed Mar 8 14:49:34 2023 -0800 [multistage][stats] clean up some stats (#10390) * adding more stats for wall time compute * also enable trace support --------- Co-authored-by: Rong Rong <ro...@startree.ai> --- .../apache/pinot/common/datatable/DataTable.java | 6 ++-- .../response/broker/BrokerResponseNativeV2.java | 3 +- .../response/broker/BrokerResponseStats.java | 32 ++++++++++++++--- .../query/reduce/ExecutionStatsAggregator.java | 40 +++++++++++++++++++--- .../apache/pinot/core/util/trace/TraceContext.java | 7 ++-- .../apache/pinot/query/runtime/QueryRunner.java | 3 +- .../query/runtime/operator/MultiStageOperator.java | 1 - .../query/runtime/operator/OperatorStats.java | 7 ++++ .../runtime/plan/ServerRequestPlanVisitor.java | 6 ++-- 9 files changed, 85 insertions(+), 20 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java index ea996cba6d..928834359f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java @@ -132,11 +132,13 @@ public interface DataTable { NUM_BLOCKS(28, "numBlocks", MetadataValueType.INT), NUM_ROWS(29, "numRows", MetadataValueType.INT), OPERATOR_EXECUTION_TIME_MS(30, "operatorExecutionTimeMs", MetadataValueType.LONG), - OPERATOR_ID(31, "operatorId", MetadataValueType.STRING); + OPERATOR_ID(31, "operatorId", MetadataValueType.STRING), + OPERATOR_EXEC_START_TIME_MS(32, "operatorExecStartTimeMs", MetadataValueType.LONG), + OPERATOR_EXEC_END_TIME_MS(33, "operatorExecEndTimeMs", MetadataValueType.LONG); // We keep this constant to track the max id added so far for backward compatibility. // Increase it when adding new keys, but NEVER DECREASE IT!!! - private static final int MAX_ID = 31; + private static final int MAX_ID = 33; private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new MetadataKey[MAX_ID + 1]; private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new HashMap<>(); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java index 5bf631e129..2eacbc23aa 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java @@ -81,7 +81,8 @@ public class BrokerResponseNativeV2 extends BrokerResponseNative { } public void addStageStat(Integer stageId, BrokerResponseStats brokerResponseStats) { - if (!brokerResponseStats.getOperatorStats().isEmpty()) { + // StageExecutionWallTime will always be there, other stats are optional such as OperatorStats + if (brokerResponseStats.getStageExecWallTimeMs() != -1) { _stageIdStats.put(stageId, brokerResponseStats); } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java index 83cbd16f3c..23482e905c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java @@ -33,11 +33,11 @@ import org.apache.pinot.spi.utils.JsonUtils; // same metadataKey // TODO: Replace member fields with a simple map of <MetadataKey, Object> // TODO: Add a subStat field, stage level subStats will contain each operator stats -@JsonPropertyOrder({"exceptions", "numBlocks", "numRows", "stageExecutionTimeMs", "numServersQueried", - "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", - "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", - "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numGroupsLimitReached", - "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", +@JsonPropertyOrder({"exceptions", "numBlocks", "numRows", "stageExecutionTimeMs", "stageExecutionUnit", + "stageExecWallTimeMs", "stageExecEndTimeMs", "numServersQueried", "numServersResponded", "numSegmentsQueried", + "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", + "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", + "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "traceInfo", "operatorStats", "tableNames"}) @@ -47,6 +47,8 @@ public class BrokerResponseStats extends BrokerResponseNative { private int _numBlocks = 0; private int _numRows = 0; private long _stageExecutionTimeMs = 0; + private int _stageExecutionUnit = 0; + private long _stageExecWallTimeMs = -1; private Map<String, Map<String, String>> _operatorStats = new HashMap<>(); private List<String> _tableNames = new ArrayList<>(); @@ -85,6 +87,26 @@ public class BrokerResponseStats extends BrokerResponseNative { _stageExecutionTimeMs = stageExecutionTimeMs; } + @JsonProperty("stageExecWallTimeMs") + public long getStageExecWallTimeMs() { + return _stageExecWallTimeMs; + } + + @JsonProperty("stageExecWallTimeMs") + public void setStageExecWallTimeMs(long stageExecWallTimeMs) { + _stageExecWallTimeMs = stageExecWallTimeMs; + } + + @JsonProperty("stageExecutionUnit") + public long getStageExecutionUnit() { + return _stageExecutionUnit; + } + + @JsonProperty("stageExecutionUnit") + public void setStageExecutionUnit(int stageExecutionUnit) { + _stageExecutionUnit = stageExecutionUnit; + } + public String toJsonString() throws IOException { return JsonUtils.objectToString(this); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java index 2faefee584..c8ef48af4a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java @@ -20,6 +20,7 @@ package org.apache.pinot.core.query.reduce; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -77,6 +78,9 @@ public class ExecutionStatsAggregator { private int _numBlocks = 0; private int _numRows = 0; private long _stageExecutionTimeMs = 0; + private long _stageExecStartTimeMs = -1; + private long _stageExecEndTimeMs = -1; + private int _stageExecutionUnit = 0; public ExecutionStatsAggregator(boolean enableTrace) { _enableTrace = enableTrace; @@ -88,10 +92,6 @@ public class ExecutionStatsAggregator { public synchronized void aggregate(@Nullable ServerRoutingInstance routingInstance, Map<String, String> metadata, Map<Integer, String> exceptions) { - // Reduce on trace info. - if (_enableTrace && metadata.containsKey(DataTable.MetadataKey.TRACE_INFO.getName())) { - _traceInfo.put(routingInstance.getShortName(), metadata.get(DataTable.MetadataKey.TRACE_INFO.getName())); - } String tableNamesStr = metadata.get(DataTable.MetadataKey.TABLE.getName()); String tableName = null; @@ -106,12 +106,21 @@ public class ExecutionStatsAggregator { } TableType tableType = null; + String instanceName = null; if (routingInstance != null) { tableType = routingInstance.getTableType(); + instanceName = routingInstance.getShortName();; } else if (tableName != null) { tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + instanceName = tableName; } else { tableType = null; + instanceName = null; + } + + // Reduce on trace info. + if (_enableTrace && metadata.containsKey(DataTable.MetadataKey.TRACE_INFO.getName()) && instanceName != null) { + _traceInfo.put(instanceName, metadata.get(DataTable.MetadataKey.TRACE_INFO.getName())); } String operatorId = metadata.get(DataTable.MetadataKey.OPERATOR_ID.getName()); @@ -256,6 +265,21 @@ public class ExecutionStatsAggregator { String operatorExecutionTimeString = metadata.get(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName()); if (operatorExecutionTimeString != null) { _stageExecutionTimeMs += Long.parseLong(operatorExecutionTimeString); + _stageExecutionUnit += 1; + } + + String operatorExecStartTimeString = metadata.get(DataTable.MetadataKey.OPERATOR_EXEC_START_TIME_MS.getName()); + if (operatorExecStartTimeString != null) { + long operatorExecStartTime = Long.parseLong(operatorExecStartTimeString); + _stageExecStartTimeMs = _stageExecStartTimeMs == -1 ? operatorExecStartTime + : Math.min(operatorExecStartTime, _stageExecStartTimeMs); + } + + String operatorExecEndTimeString = metadata.get(DataTable.MetadataKey.OPERATOR_EXEC_END_TIME_MS.getName()); + if (operatorExecEndTimeString != null) { + long operatorExecEndTime = Long.parseLong(operatorExecEndTimeString); + _stageExecEndTimeMs = _stageExecEndTimeMs == -1 ? operatorExecEndTime + : Math.max(operatorExecEndTime, _stageExecEndTimeMs); } } @@ -344,7 +368,13 @@ public class ExecutionStatsAggregator { brokerResponseStats.setNumBlocks(_numBlocks); brokerResponseStats.setNumRows(_numRows); brokerResponseStats.setStageExecutionTimeMs(_stageExecutionTimeMs); - brokerResponseStats.setOperatorStats(_operatorStats); + brokerResponseStats.setStageExecWallTimeMs(_stageExecEndTimeMs - _stageExecStartTimeMs); + brokerResponseStats.setStageExecutionUnit(_stageExecutionUnit); + if (_enableTrace) { + brokerResponseStats.setOperatorStats(_operatorStats); + } else { + brokerResponseStats.setOperatorStats(Collections.emptyMap()); + } brokerResponseStats.setTableNames(new ArrayList<>(_tableNames)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java index 4d430434e5..0bdaf0da62 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java @@ -189,8 +189,11 @@ public final class TraceContext { */ public static String getTraceInfo() { ArrayNode jsonTraces = JsonUtils.newArrayNode(); - for (Trace trace : REQUEST_TO_TRACES_MAP.get(TRACE_ENTRY_THREAD_LOCAL.get()._requestId)) { - jsonTraces.add(trace.toJson()); + Queue<Trace> traces = REQUEST_TO_TRACES_MAP.get(TRACE_ENTRY_THREAD_LOCAL.get()._requestId); + if (traces != null && !traces.isEmpty()) { + for (Trace trace : traces) { + jsonTraces.add(trace.toJson()); + } } return jsonTraces.toString(); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 4c7380943e..a1d22c678f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -184,8 +184,7 @@ public class QueryRunner { MailboxSendOperator mailboxSendOperator = new MailboxSendOperator(_mailboxService, new LeafStageTransferableBlockOperator(serverQueryResults, sendNode.getDataSchema(), requestId, sendNode.getStageId(), _rootServer), receivingStageMetadata.getServerInstances(), - sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), _rootServer, - serverQueryRequests.get(0).getRequestId(), + sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), _rootServer, requestId, sendNode.getStageId(), sendNode.getReceiverStageId()); int blockCounter = 0; while (!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java index a6bc863457..55fbc8830d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java @@ -73,7 +73,6 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>, for (MultiStageOperator op : getChildOperators()) { _operatorStatsMap.putAll(op.getOperatorStatsMap()); } - if (!_operatorStats.getExecutionStats().isEmpty()) { _operatorStats.recordSingleStat(DataTable.MetadataKey.OPERATOR_ID.getName(), _operatorId); _operatorStatsMap.put(_operatorId, _operatorStats); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java index 74cefbbc90..715be163bb 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java @@ -39,6 +39,7 @@ public class OperatorStats { private int _numBlock = 0; private int _numRows = 0; + private long _startTimeMs = -1; private final Map<String, String> _executionStats; public OperatorStats(long requestId, int stageId, VirtualServerAddress serverAddress, String operatorType) { @@ -50,6 +51,7 @@ public class OperatorStats { } public void startTimer() { + _startTimeMs = _startTimeMs == -1 ? System.currentTimeMillis() : _startTimeMs; if (!_executeStopwatch.isRunning()) { _executeStopwatch.start(); } @@ -79,6 +81,11 @@ public class OperatorStats { _executionStats.putIfAbsent(DataTable.MetadataKey.NUM_ROWS.getName(), String.valueOf(_numRows)); _executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName(), String.valueOf(_executeStopwatch.elapsed(TimeUnit.MILLISECONDS))); + // wall time are recorded slightly longer than actual execution but it is OK. + _executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXEC_START_TIME_MS.getName(), + String.valueOf(_startTimeMs)); + _executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXEC_END_TIME_MS.getName(), + String.valueOf(System.currentTimeMillis())); return _executionStats; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java index 97924586b1..55141cc739 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java @@ -93,7 +93,9 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl DistributedStagePlan stagePlan, Map<String, String> requestMetadataMap, TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String> segmentList) { // Before-visit: construct the ServerPlanRequestContext baseline - long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)); + // Making a unique requestId for leaf stages otherwise it causes problem on stats/metrics/tracing. + long requestId = (Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)) << 16) + + (stagePlan.getStageId() << 8) + (tableType == TableType.REALTIME ? 1 : 0); long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS)); PinotQuery pinotQuery = new PinotQuery(); Integer leafNodeLimit = QueryOptionsUtils.getMultiStageLeafLimit(requestMetadataMap); @@ -140,7 +142,7 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl InstanceRequest instanceRequest = new InstanceRequest(); instanceRequest.setRequestId(requestId); instanceRequest.setBrokerId("unknown"); - instanceRequest.setEnableTrace(false); + instanceRequest.setEnableTrace(Boolean.parseBoolean(requestMetadataMap.get(CommonConstants.Broker.Request.TRACE))); instanceRequest.setSearchSegments(segmentList); instanceRequest.setQuery(brokerRequest); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org