Jackie-Jiang commented on code in PR #12704: URL: https://github.com/apache/pinot/pull/12704#discussion_r1580263654
########## pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java: ########## @@ -77,20 +100,301 @@ public static BrokerResponseNativeV2 empty() { return new BrokerResponseNativeV2(); } - public static BrokerResponseNativeV2 fromJsonString(String jsonString) - throws IOException { - return JsonUtils.stringToObject(jsonString, BrokerResponseNativeV2.class); + public void addStageStats(JsonNode stageStats) { + ObjectNode node = JsonUtils.newObjectNode(); + node.put("stage", _stageIdStats.size()); + node.set("stats", stageStats); + _stageIdStats.add(node); + } + + @JsonProperty + public List<JsonNode> getStageStats() { + return _stageIdStats; + } + + @JsonProperty + public long getMaxRows() { + return _maxRows; + } + + public void mergeMaxRows(long maxRows) { + _maxRows = Math.max(_maxRows, maxRows); + } + + @Override + public void setTimeUsedMs(long timeUsedMs) { + _serverStats.merge(DataTable.MetadataKey.TIME_USED_MS, timeUsedMs); + } + + @Override + public long getNumDocsScanned() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_DOCS_SCANNED); + } + + @Override + public long getNumEntriesScannedInFilter() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER); + } + + @Override + public long getNumEntriesScannedPostFilter() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER); + } + + @Override + public long getNumSegmentsQueried() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_QUERIED); + } + + @Override + public long getNumSegmentsProcessed() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED); + } + + @Override + public long getNumSegmentsMatched() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_MATCHED); + } + + @Override + public long getNumConsumingSegmentsQueried() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED); + } + + @Override + public long getNumConsumingSegmentsProcessed() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED); + } + + @Override + public long getNumConsumingSegmentsMatched() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED); + } + + @Override + public long getMinConsumingFreshnessTimeMs() { + return _serverStats.getLong(DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS); + } + + @Override + public long getTotalDocs() { + return _serverStats.getLong(DataTable.MetadataKey.TOTAL_DOCS); + } + + @Override + public boolean isNumGroupsLimitReached() { + return _serverStats.getBoolean(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED); + } + + public void mergeNumGroupsLimitReached(boolean numGroupsLimitReached) { + _serverStats.merge(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED, numGroupsLimitReached); + } + + @Override + public long getNumSegmentsPrunedByServer() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER); + } + + @Override + public long getNumSegmentsPrunedInvalid() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID); } - public void addStageStat(Integer stageId, BrokerResponseStats brokerResponseStats) { - // StageExecutionWallTime will always be there, other stats are optional such as OperatorStats - if (brokerResponseStats.getStageExecWallTimeMs() != -1) { - _stageIdStats.put(stageId, brokerResponseStats); + @Override + public long getNumSegmentsPrunedByLimit() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT); + } + + @Override + public long getNumSegmentsPrunedByValue() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE); + } + + @Override + public long getExplainPlanNumEmptyFilterSegments() { + return _serverStats.getLong(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS); + } + + @Override + public long getExplainPlanNumMatchAllFilterSegments() { + return _serverStats.getLong(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS); + } + + @Override + public long getOfflineTotalCpuTimeNs() { + return getOfflineThreadCpuTimeNs() + getOfflineSystemActivitiesCpuTimeNs() + + getOfflineResponseSerializationCpuTimeNs(); + } + + @Override + public long getRealtimeTotalCpuTimeNs() { + return getRealtimeThreadCpuTimeNs() + getRealtimeSystemActivitiesCpuTimeNs() + + getRealtimeResponseSerializationCpuTimeNs(); + } + + @Override + public void setExceptions(List<ProcessingException> exceptions) { + for (ProcessingException exception : exceptions) { + _processingExceptions.add(new QueryProcessingException(exception.getErrorCode(), exception.getMessage())); } } - @JsonProperty("stageStats") - public Map<Integer, BrokerResponseStats> getStageIdStats() { - return _stageIdStats; + public void addToExceptions(QueryProcessingException processingException) { + _processingExceptions.add(processingException); + } + + @Override + public int getNumServersQueried() { + return _numServersQueried; + } + + @Override + public void setNumServersQueried(int numServersQueried) { + _numServersQueried = numServersQueried; + } + + @Override + public int getNumServersResponded() { + return _numServersResponded; + } + + @Override + public void setNumServersResponded(int numServersResponded) { + _numServersResponded = numServersResponded; + } + + @JsonProperty("maxRowsInJoinReached") + public boolean isMaxRowsInJoinReached() { + return _maxRowsInJoinReached; + } + + @JsonProperty("maxRowsInJoinReached") + public void mergeMaxRowsInJoinReached(boolean maxRowsInJoinReached) { + _maxRowsInJoinReached |= maxRowsInJoinReached; + } + + @Override + public int getExceptionsSize() { + return _processingExceptions.size(); + } + + @Override + public void setResultTable(ResultTable resultTable) { + _resultTable = resultTable; + } + + @Override + public ResultTable getResultTable() { + return _resultTable; + } + + @Override + public List<QueryProcessingException> getProcessingExceptions() { + return List.of(); + } + + @Override + public int getNumRowsResultSet() { + return 0; + } + + @Override + public long getOfflineThreadCpuTimeNs() { + return _offlineThreadCpuTimeNs; + } + + @Override + public long getRealtimeThreadCpuTimeNs() { + return _realtimeThreadCpuTimeNs; + } + + @Override + public long getOfflineSystemActivitiesCpuTimeNs() { + return _offlineSystemActivitiesCpuTimeNs; + } + + @Override + public long getRealtimeSystemActivitiesCpuTimeNs() { + return _realtimeSystemActivitiesCpuTimeNs; + } + + @Override + public long getOfflineResponseSerializationCpuTimeNs() { + return _offlineResponseSerializationCpuTimeNs; + } + + @Override + public long getRealtimeResponseSerializationCpuTimeNs() { + return _realtimeResponseSerializationCpuTimeNs; + } + + @Override + public long getNumSegmentsPrunedByBroker() { + return _numSegmentsPrunedByBroker; + } + + @Override + public void setNumSegmentsPrunedByBroker(long numSegmentsPrunedByBroker) { + _numSegmentsPrunedByBroker = numSegmentsPrunedByBroker; + } + + @Override + public String getRequestId() { + return _requestId; + } + + @Override + public void setRequestId(String requestId) { + _requestId = requestId; + } + + @Override + public String getBrokerId() { + return _brokerId; + } + + @Override + public void setBrokerId(String requestId) { + _brokerId = requestId; + } + + @Override + public long getBrokerReduceTimeMs() { + return _brokerReduceTimeMs; + } + + @Override + public void setBrokerReduceTimeMs(long brokerReduceTimeMs) { + _brokerReduceTimeMs = brokerReduceTimeMs; + } + + @JsonProperty(access = JsonProperty.Access.READ_ONLY) + @Override + public boolean isPartialResult() { + return isNumGroupsLimitReached() || getExceptionsSize() > 0 || isMaxRowsInJoinReached(); + } + + public void addServerStats(StatMap<DataTable.MetadataKey> serverStats) { + // Set execution statistics. + _serverStats.merge(serverStats); + + long threadCpuTimeNs = serverStats.getLong(DataTable.MetadataKey.THREAD_CPU_TIME_NS); + long systemActivitiesCpuTimeNs = serverStats.getLong(DataTable.MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS); + long responseSerializationCpuTimeNs = serverStats.getLong(DataTable.MetadataKey.RESPONSE_SER_CPU_TIME_NS); + + String tableName = serverStats.getString(DataTable.MetadataKey.TABLE); + if (tableName != null) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + + if (tableType == TableType.OFFLINE) { + _offlineThreadCpuTimeNs += threadCpuTimeNs; + _offlineSystemActivitiesCpuTimeNs += systemActivitiesCpuTimeNs; + _offlineResponseSerializationCpuTimeNs += responseSerializationCpuTimeNs; + } else { + _realtimeThreadCpuTimeNs += threadCpuTimeNs; + _realtimeSystemActivitiesCpuTimeNs += systemActivitiesCpuTimeNs; + _realtimeResponseSerializationCpuTimeNs += responseSerializationCpuTimeNs; + } Review Comment: Good catch! I guess this is an example where we cannot directly expose V1 stats as V2 stats even for leaf stage. V1 server request only touch one table (either OFFLINE or REALTIME), but V2 leaf stage can touch 1 or 2 tables. To solve this, I think we should read the V1 stats in leaf stage, and convert them into leaf stage stats there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org