Jackie-Jiang commented on code in PR #12704:
URL: https://github.com/apache/pinot/pull/12704#discussion_r1580275410
##########
pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java:
##########
@@ -77,20 +100,301 @@ public static BrokerResponseNativeV2 empty() {
return new BrokerResponseNativeV2();
}
- public static BrokerResponseNativeV2 fromJsonString(String jsonString)
- throws IOException {
- return JsonUtils.stringToObject(jsonString, BrokerResponseNativeV2.class);
+ public void addStageStats(JsonNode stageStats) {
+ ObjectNode node = JsonUtils.newObjectNode();
+ node.put("stage", _stageIdStats.size());
+ node.set("stats", stageStats);
+ _stageIdStats.add(node);
+ }
+
+ @JsonProperty
+ public List<JsonNode> getStageStats() {
+ return _stageIdStats;
+ }
+
+ @JsonProperty
+ public long getMaxRows() {
+ return _maxRows;
+ }
+
+ public void mergeMaxRows(long maxRows) {
+ _maxRows = Math.max(_maxRows, maxRows);
+ }
+
+ @Override
+ public void setTimeUsedMs(long timeUsedMs) {
+ _serverStats.merge(DataTable.MetadataKey.TIME_USED_MS, timeUsedMs);
+ }
+
+ @Override
+ public long getNumDocsScanned() {
+ return _serverStats.getLong(DataTable.MetadataKey.NUM_DOCS_SCANNED);
+ }
+
+ @Override
+ public long getNumEntriesScannedInFilter() {
+ return
_serverStats.getLong(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER);
+ }
+
+ @Override
+ public long getNumEntriesScannedPostFilter() {
+ return
_serverStats.getLong(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER);
+ }
+
+ @Override
+ public long getNumSegmentsQueried() {
+ return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_QUERIED);
+ }
+
+ @Override
+ public long getNumSegmentsProcessed() {
+ return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED);
+ }
+
+ @Override
+ public long getNumSegmentsMatched() {
+ return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_MATCHED);
+ }
+
+ @Override
+ public long getNumConsumingSegmentsQueried() {
+ return
_serverStats.getLong(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED);
+ }
+
+ @Override
+ public long getNumConsumingSegmentsProcessed() {
+ return
_serverStats.getLong(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED);
+ }
+
+ @Override
+ public long getNumConsumingSegmentsMatched() {
+ return
_serverStats.getLong(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED);
+ }
+
+ @Override
+ public long getMinConsumingFreshnessTimeMs() {
+ return
_serverStats.getLong(DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS);
+ }
+
+ @Override
+ public long getTotalDocs() {
+ return _serverStats.getLong(DataTable.MetadataKey.TOTAL_DOCS);
+ }
+
+ @Override
+ public boolean isNumGroupsLimitReached() {
+ return
_serverStats.getBoolean(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED);
+ }
+
+ public void mergeNumGroupsLimitReached(boolean numGroupsLimitReached) {
+ _serverStats.merge(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED,
numGroupsLimitReached);
+ }
+
+ @Override
+ public long getNumSegmentsPrunedByServer() {
+ return
_serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER);
+ }
+
+ @Override
+ public long getNumSegmentsPrunedInvalid() {
+ return
_serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID);
}
- public void addStageStat(Integer stageId, BrokerResponseStats
brokerResponseStats) {
- // StageExecutionWallTime will always be there, other stats are optional
such as OperatorStats
- if (brokerResponseStats.getStageExecWallTimeMs() != -1) {
- _stageIdStats.put(stageId, brokerResponseStats);
+ @Override
+ public long getNumSegmentsPrunedByLimit() {
+ return
_serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT);
+ }
+
+ @Override
+ public long getNumSegmentsPrunedByValue() {
+ return
_serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE);
+ }
+
+ @Override
+ public long getExplainPlanNumEmptyFilterSegments() {
+ return
_serverStats.getLong(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS);
+ }
+
+ @Override
+ public long getExplainPlanNumMatchAllFilterSegments() {
+ return
_serverStats.getLong(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS);
+ }
+
+ @Override
+ public long getOfflineTotalCpuTimeNs() {
+ return getOfflineThreadCpuTimeNs() + getOfflineSystemActivitiesCpuTimeNs()
+ + getOfflineResponseSerializationCpuTimeNs();
+ }
+
+ @Override
+ public long getRealtimeTotalCpuTimeNs() {
+ return getRealtimeThreadCpuTimeNs() +
getRealtimeSystemActivitiesCpuTimeNs()
+ + getRealtimeResponseSerializationCpuTimeNs();
+ }
+
+ @Override
+ public void setExceptions(List<ProcessingException> exceptions) {
+ for (ProcessingException exception : exceptions) {
+ _processingExceptions.add(new
QueryProcessingException(exception.getErrorCode(), exception.getMessage()));
}
}
- @JsonProperty("stageStats")
- public Map<Integer, BrokerResponseStats> getStageIdStats() {
- return _stageIdStats;
+ public void addToExceptions(QueryProcessingException processingException) {
+ _processingExceptions.add(processingException);
+ }
+
+ @Override
+ public int getNumServersQueried() {
+ return _numServersQueried;
+ }
+
+ @Override
+ public void setNumServersQueried(int numServersQueried) {
+ _numServersQueried = numServersQueried;
+ }
+
+ @Override
+ public int getNumServersResponded() {
+ return _numServersResponded;
+ }
+
+ @Override
+ public void setNumServersResponded(int numServersResponded) {
+ _numServersResponded = numServersResponded;
+ }
+
+ @JsonProperty("maxRowsInJoinReached")
+ public boolean isMaxRowsInJoinReached() {
+ return _maxRowsInJoinReached;
+ }
+
+ @JsonProperty("maxRowsInJoinReached")
+ public void mergeMaxRowsInJoinReached(boolean maxRowsInJoinReached) {
+ _maxRowsInJoinReached |= maxRowsInJoinReached;
+ }
+
+ @Override
+ public int getExceptionsSize() {
+ return _processingExceptions.size();
+ }
+
+ @Override
+ public void setResultTable(ResultTable resultTable) {
+ _resultTable = resultTable;
+ }
+
+ @Override
+ public ResultTable getResultTable() {
+ return _resultTable;
+ }
+
+ @Override
+ public List<QueryProcessingException> getProcessingExceptions() {
+ return List.of();
+ }
+
+ @Override
+ public int getNumRowsResultSet() {
+ return 0;
+ }
+
+ @Override
+ public long getOfflineThreadCpuTimeNs() {
+ return _offlineThreadCpuTimeNs;
+ }
+
+ @Override
+ public long getRealtimeThreadCpuTimeNs() {
+ return _realtimeThreadCpuTimeNs;
+ }
+
+ @Override
+ public long getOfflineSystemActivitiesCpuTimeNs() {
+ return _offlineSystemActivitiesCpuTimeNs;
+ }
+
+ @Override
+ public long getRealtimeSystemActivitiesCpuTimeNs() {
+ return _realtimeSystemActivitiesCpuTimeNs;
+ }
+
+ @Override
+ public long getOfflineResponseSerializationCpuTimeNs() {
+ return _offlineResponseSerializationCpuTimeNs;
+ }
+
+ @Override
+ public long getRealtimeResponseSerializationCpuTimeNs() {
+ return _realtimeResponseSerializationCpuTimeNs;
+ }
+
+ @Override
+ public long getNumSegmentsPrunedByBroker() {
+ return _numSegmentsPrunedByBroker;
+ }
+
+ @Override
+ public void setNumSegmentsPrunedByBroker(long numSegmentsPrunedByBroker) {
+ _numSegmentsPrunedByBroker = numSegmentsPrunedByBroker;
+ }
+
+ @Override
+ public String getRequestId() {
+ return _requestId;
+ }
+
+ @Override
+ public void setRequestId(String requestId) {
+ _requestId = requestId;
+ }
+
+ @Override
+ public String getBrokerId() {
+ return _brokerId;
+ }
+
+ @Override
+ public void setBrokerId(String requestId) {
+ _brokerId = requestId;
+ }
+
+ @Override
+ public long getBrokerReduceTimeMs() {
+ return _brokerReduceTimeMs;
+ }
+
+ @Override
+ public void setBrokerReduceTimeMs(long brokerReduceTimeMs) {
+ _brokerReduceTimeMs = brokerReduceTimeMs;
+ }
+
+ @JsonProperty(access = JsonProperty.Access.READ_ONLY)
+ @Override
+ public boolean isPartialResult() {
+ return isNumGroupsLimitReached() || getExceptionsSize() > 0 ||
isMaxRowsInJoinReached();
+ }
+
+ public void addServerStats(StatMap<DataTable.MetadataKey> serverStats) {
+ // Set execution statistics.
+ _serverStats.merge(serverStats);
+
+ long threadCpuTimeNs =
serverStats.getLong(DataTable.MetadataKey.THREAD_CPU_TIME_NS);
+ long systemActivitiesCpuTimeNs =
serverStats.getLong(DataTable.MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS);
+ long responseSerializationCpuTimeNs =
serverStats.getLong(DataTable.MetadataKey.RESPONSE_SER_CPU_TIME_NS);
+
+ String tableName = serverStats.getString(DataTable.MetadataKey.TABLE);
+ if (tableName != null) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
+
+ if (tableType == TableType.OFFLINE) {
+ _offlineThreadCpuTimeNs += threadCpuTimeNs;
+ _offlineSystemActivitiesCpuTimeNs += systemActivitiesCpuTimeNs;
+ _offlineResponseSerializationCpuTimeNs +=
responseSerializationCpuTimeNs;
+ } else {
+ _realtimeThreadCpuTimeNs += threadCpuTimeNs;
+ _realtimeSystemActivitiesCpuTimeNs += systemActivitiesCpuTimeNs;
+ _realtimeResponseSerializationCpuTimeNs +=
responseSerializationCpuTimeNs;
+ }
Review Comment:
Oh, I think I misunderstood the issue, and we are already reposting the
stats in leaf stage operator. And now I understand better why we need to make
V1 stats also implementing `StatMap.Key`.
I guess we can just leave them as 0. People can lookup the V2 stats to get
the details
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]