This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new c5dd6df27c Add Statistics grouped at Stage ID level in the V2 Engine Response (#10337) c5dd6df27c is described below commit c5dd6df27c384b523c77adcb66956bf0e5c37251 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Tue Feb 28 15:49:46 2023 +0530 Add Statistics grouped at Stage ID level in the V2 Engine Response (#10337) * WIP: aggregate stats on stage level for response * Make response backward compatible * Add new metadata keys to enum and replace hardcoded values; also add table names to the stats * Rename operatorExecutionTime to stageExecutionTime for correct understanding * Remove sysout * Remove duplicate code inside BrokerResponseStats class * Remove unused constants from OperatorUtils and fix formatting * Add test for stage level stats as well as BrokerResponseNativeV2 * Add followup TODOs and move method to utils class --------- Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local> --- .../MultiStageBrokerRequestHandler.java | 36 ++++++- .../apache/pinot/common/datatable/DataTable.java | 15 ++- .../response/broker/BrokerResponseNativeV2.java | 93 +++++++++++++++++ .../response/broker/BrokerResponseStats.java | 110 +++++++++++++++++++++ .../query/reduce/ExecutionStatsAggregator.java | 76 ++++++++++++-- .../query/runtime/operator/MultiStageOperator.java | 2 + .../query/runtime/operator/OperatorStats.java | 15 ++- .../runtime/operator/utils/OperatorUtils.java | 14 ++- .../pinot/query/service/QueryDispatcher.java | 20 ++-- .../pinot/query/runtime/QueryRunnerTestBase.java | 7 +- .../runtime/queries/ResourceBasedQueriesTest.java | 56 ++++++++--- 11 files changed, 395 insertions(+), 49 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 63c1e8f9ad..e2dae3b75f 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -20,6 +20,7 @@ package org.apache.pinot.broker.requesthandler; import com.fasterxml.jackson.databind.JsonNode; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -37,6 +38,8 @@ import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.response.BrokerResponse; import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.BrokerResponseNativeV2; +import org.apache.pinot.common.response.broker.BrokerResponseStats; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.config.QueryOptionsUtils; @@ -58,6 +61,7 @@ import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.exception.BadQueryRequestException; import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.parsers.SqlNodeAndOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,7 +137,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { return handleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext); } - private BrokerResponseNative handleRequest(long requestId, String query, + private BrokerResponse handleRequest(long requestId, String query, @Nullable SqlNodeAndOptions sqlNodeAndOptions, JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) throws Exception { @@ -166,16 +170,20 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { } ResultTable queryResults; - ExecutionStatsAggregator executionStatsAggregator = new ExecutionStatsAggregator(false); + Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>(); + for (Integer stageId: queryPlan.getStageMetadataMap().keySet()) { + stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(false)); + } + try { queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan, _mailboxService, queryTimeoutMs, - sqlNodeAndOptions.getOptions(), executionStatsAggregator); + sqlNodeAndOptions.getOptions(), stageIdStatsMap); } catch (Exception e) { LOGGER.info("query execution failed", e); return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); } - BrokerResponseNative brokerResponse = new BrokerResponseNative(); + BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); long executionEndTimeNs = System.nanoTime(); // Set total query processing time @@ -184,7 +192,25 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { brokerResponse.setTimeUsedMs(totalTimeMs); brokerResponse.setResultTable(queryResults); - executionStatsAggregator.setStats(brokerResponse); + for (Map.Entry<Integer, ExecutionStatsAggregator> entry : stageIdStatsMap.entrySet()) { + if (entry.getKey() == 0) { + // Root stats are aggregated and added separately to broker response for backward compatibility + entry.getValue().setStats(brokerResponse); + continue; + } + + BrokerResponseStats brokerResponseStats = new BrokerResponseStats(); + List<String> tableNames = queryPlan.getStageMetadataMap().get(entry.getKey()).getScannedTables(); + if (tableNames.size() > 0) { + //TODO: Only using first table to assign broker metrics + // find a way to split metrics in case of multiple table + String rawTableName = TableNameBuilder.extractRawTableName(tableNames.get(0)); + entry.getValue().setStageLevelStats(rawTableName, brokerResponseStats, _brokerMetrics); + } else { + entry.getValue().setStageLevelStats(null, brokerResponseStats, null); + } + brokerResponse.addStageStat(entry.getKey(), brokerResponseStats); + } requestContext.setQueryProcessingTime(totalTimeMs); augmentStatistics(requestContext, brokerResponse); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java index 9bac9706a7..ea996cba6d 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java @@ -102,7 +102,7 @@ public interface DataTable { */ enum MetadataKey { UNKNOWN(0, "unknown", MetadataValueType.STRING), - TABLE(1, "table", MetadataValueType.STRING), // NOTE: this key is only used in PrioritySchedulerTest + TABLE(1, "table", MetadataValueType.STRING), NUM_DOCS_SCANNED(2, "numDocsScanned", MetadataValueType.LONG), NUM_ENTRIES_SCANNED_IN_FILTER(3, "numEntriesScannedInFilter", MetadataValueType.LONG), NUM_ENTRIES_SCANNED_POST_FILTER(4, "numEntriesScannedPostFilter", MetadataValueType.LONG), @@ -110,8 +110,6 @@ public interface DataTable { NUM_SEGMENTS_PROCESSED(6, "numSegmentsProcessed", MetadataValueType.INT), NUM_SEGMENTS_MATCHED(7, "numSegmentsMatched", MetadataValueType.INT), NUM_CONSUMING_SEGMENTS_QUERIED(8, "numConsumingSegmentsQueried", MetadataValueType.INT), - NUM_CONSUMING_SEGMENTS_PROCESSED(26, "numConsumingSegmentsProcessed", MetadataValueType.INT), - NUM_CONSUMING_SEGMENTS_MATCHED(27, "numConsumingSegmentsMatched", MetadataValueType.INT), MIN_CONSUMING_FRESHNESS_TIME_MS(9, "minConsumingFreshnessTimeMs", MetadataValueType.LONG), TOTAL_DOCS(10, "totalDocs", MetadataValueType.LONG), NUM_GROUPS_LIMIT_REACHED(11, "numGroupsLimitReached", MetadataValueType.STRING), @@ -128,11 +126,17 @@ public interface DataTable { NUM_SEGMENTS_PRUNED_BY_LIMIT(22, "numSegmentsPrunedByLimit", MetadataValueType.INT), NUM_SEGMENTS_PRUNED_BY_VALUE(23, "numSegmentsPrunedByValue", MetadataValueType.INT), EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS(24, "explainPlanNumEmptyFilterSegments", MetadataValueType.INT), - EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS(25, "explainPlanNumMatchAllFilterSegments", MetadataValueType.INT); + EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS(25, "explainPlanNumMatchAllFilterSegments", MetadataValueType.INT), + NUM_CONSUMING_SEGMENTS_PROCESSED(26, "numConsumingSegmentsProcessed", MetadataValueType.INT), + NUM_CONSUMING_SEGMENTS_MATCHED(27, "numConsumingSegmentsMatched", MetadataValueType.INT), + NUM_BLOCKS(28, "numBlocks", MetadataValueType.INT), + NUM_ROWS(29, "numRows", MetadataValueType.INT), + OPERATOR_EXECUTION_TIME_MS(30, "operatorExecutionTimeMs", MetadataValueType.LONG), + OPERATOR_ID(31, "operatorId", MetadataValueType.STRING); // We keep this constant to track the max id added so far for backward compatibility. // Increase it when adding new keys, but NEVER DECREASE IT!!! - private static final int MAX_ID = 27; + private static final int MAX_ID = 31; private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new MetadataKey[MAX_ID + 1]; private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new HashMap<>(); @@ -186,6 +190,7 @@ public interface DataTable { int id = key.getId(); Preconditions.checkArgument(id >= 0 && id <= MAX_ID, "Invalid id: %s for MetadataKey: %s, must be in the range of [0, MAX_ID(%s)]", id, key, MAX_ID); + Preconditions.checkArgument(ID_TO_ENUM_KEY_MAP[id] == null, "Duplicate id: %s defined for MetadataKey: %s and %s", id, ID_TO_ENUM_KEY_MAP[id], key); ID_TO_ENUM_KEY_MAP[id] = key; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java new file mode 100644 index 0000000000..79605773d7 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.response.broker; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.response.ProcessingException; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.spi.utils.JsonUtils; + + +/** + * This class implements pinot-broker's response format for any given query. + * All fields either primitive data types, or native objects (as opposed to JSONObjects). + * + * Supports serialization via JSON. + */ +@JsonPropertyOrder({ + "resultTable", "stageStats", "exceptions", "numServersQueried", "numServersResponded", "numSegmentsQueried", + "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", + "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", + "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", + "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs", + "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "segmentStatistics", + "traceInfo" +}) +public class BrokerResponseNativeV2 extends BrokerResponseNative { + private final Map<Integer, BrokerResponseStats> _stageIdStats = new HashMap<>(); + + public BrokerResponseNativeV2() { + } + + public BrokerResponseNativeV2(ProcessingException exception) { + super(exception); + } + + public BrokerResponseNativeV2(List<ProcessingException> exceptions) { + super(exceptions); + } + + /** Generate EXPLAIN PLAN output when queries are evaluated by Broker without going to the Server. */ + private static BrokerResponseNativeV2 getBrokerResponseExplainPlanOutput() { + BrokerResponseNativeV2 brokerResponse = BrokerResponseNativeV2.empty(); + List<Object[]> rows = new ArrayList<>(); + rows.add(new Object[]{"BROKER_EVALUATE", 0, -1}); + brokerResponse.setResultTable(new ResultTable(DataSchema.EXPLAIN_RESULT_SCHEMA, rows)); + return brokerResponse; + } + + /** + * Get a new empty {@link BrokerResponseNativeV2}. + */ + public static BrokerResponseNativeV2 empty() { + return new BrokerResponseNativeV2(); + } + + public static BrokerResponseNativeV2 fromJsonString(String jsonString) + throws IOException { + return JsonUtils.stringToObject(jsonString, BrokerResponseNativeV2.class); + } + + public void addStageStat(Integer stageId, BrokerResponseStats brokerResponseStats) { + if (!brokerResponseStats.getOperatorIds().isEmpty()) { + _stageIdStats.put(stageId, brokerResponseStats); + } + } + + @JsonProperty("stageStats") + public Map<Integer, BrokerResponseStats> getStageIdStats() { + return _stageIdStats; + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java new file mode 100644 index 0000000000..60d7ab4813 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.response.broker; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.pinot.spi.utils.JsonUtils; + + +// TODO: Decouple the execution stats aggregator logic and make it into a util that can aggregate 2 values with the +// same metadataKey +// TODO: Replace member fields with a simple map of <MetadataKey, Object> +// TODO: Add a subStat field, stage level subStats will contain each operator stats +@JsonPropertyOrder({"exceptions", "numBlocks", "numRows", "stageExecutionTimeMs", "numServersQueried", + "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", + "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", + "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numGroupsLimitReached", + "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", + "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs", + "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", + "traceInfo", "operatorIds", "tableNames"}) +@JsonInclude(JsonInclude.Include.NON_DEFAULT) +public class BrokerResponseStats extends BrokerResponseNative { + + private int _numBlocks = 0; + private int _numRows = 0; + private long _stageExecutionTimeMs = 0; + private List<String> _operatorIds = new ArrayList<>(); + private List<String> _tableNames = new ArrayList<>(); + + @Override + public ResultTable getResultTable() { + return null; + } + + @JsonProperty("numBlocks") + public int getNumBlocks() { + return _numBlocks; + } + + @JsonProperty("numBlocks") + public void setNumBlocks(int numBlocks) { + _numBlocks = numBlocks; + } + + @JsonProperty("numRows") + public int getNumRows() { + return _numRows; + } + + @JsonProperty("numRows") + public void setNumRows(int numRows) { + _numRows = numRows; + } + + @JsonProperty("stageExecutionTimeMs") + public long getStageExecutionTimeMs() { + return _stageExecutionTimeMs; + } + + @JsonProperty("stageExecutionTimeMs") + public void setStageExecutionTimeMs(long stageExecutionTimeMs) { + _stageExecutionTimeMs = stageExecutionTimeMs; + } + + public String toJsonString() + throws IOException { + return JsonUtils.objectToString(this); + } + + @JsonProperty("operatorIds") + public List<String> getOperatorIds() { + return _operatorIds; + } + + @JsonProperty("operatorIds") + public void setOperatorIds(List<String> operatorIds) { + _operatorIds = operatorIds; + } + + @JsonProperty("tableNames") + public List<String> getTableNames() { + return _tableNames; + } + + @JsonProperty("tableNames") + public void setTableNames(List<String> tableNames) { + _tableNames = tableNames; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java index 0c93252c51..889b859c29 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java @@ -19,24 +19,32 @@ package org.apache.pinot.core.query.reduce; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.LongConsumer; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.metrics.BrokerTimer; import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.BrokerResponseStats; import org.apache.pinot.common.response.broker.QueryProcessingException; import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; public class ExecutionStatsAggregator { private final List<QueryProcessingException> _processingExceptions = new ArrayList<>(); + private final List<String> _operatorIds = new ArrayList<>(); + private final Set<String> _tableNames = new HashSet<>(); private final Map<String, String> _traceInfo = new HashMap<>(); private final boolean _enableTrace; @@ -66,6 +74,9 @@ public class ExecutionStatsAggregator { private long _explainPlanNumEmptyFilterSegments = 0L; private long _explainPlanNumMatchAllFilterSegments = 0L; private boolean _numGroupsLimitReached = false; + private int _numBlocks = 0; + private int _numRows = 0; + private long _stageExecutionTimeMs = 0; public ExecutionStatsAggregator(boolean enableTrace) { _enableTrace = enableTrace; @@ -82,6 +93,32 @@ public class ExecutionStatsAggregator { _traceInfo.put(routingInstance.getShortName(), metadata.get(DataTable.MetadataKey.TRACE_INFO.getName())); } + String tableNamesStr = metadata.get(DataTable.MetadataKey.TABLE.getName()); + String tableName = null; + + if (tableNamesStr != null) { + List<String> tableNames = Arrays.stream(tableNamesStr.split("::")).collect(Collectors.toList()); + _tableNames.addAll(tableNames); + + //TODO: Decide a strategy to split stageLevel stats across tables for brokerMetrics + // assigning everything to the first table only for now + tableName = tableNames.get(0); + } + + TableType tableType = null; + if (routingInstance != null) { + tableType = routingInstance.getTableType(); + } else if (tableName != null) { + tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + } else { + tableType = null; + } + + String operatorId = metadata.get(DataTable.MetadataKey.OPERATOR_ID.getName()); + if (operatorId != null) { + _operatorIds.add(operatorId); + } + // Reduce on exceptions. for (int key : exceptions.keySet()) { _processingExceptions.add(new QueryProcessingException(key, exceptions.get(key))); @@ -141,8 +178,8 @@ public class ExecutionStatsAggregator { } String threadCpuTimeNsString = metadata.get(DataTable.MetadataKey.THREAD_CPU_TIME_NS.getName()); - if (routingInstance != null && threadCpuTimeNsString != null) { - if (routingInstance.getTableType() == TableType.OFFLINE) { + if (tableType != null && threadCpuTimeNsString != null) { + if (tableType == TableType.OFFLINE) { _offlineThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString); } else { _realtimeThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString); @@ -151,8 +188,8 @@ public class ExecutionStatsAggregator { String systemActivitiesCpuTimeNsString = metadata.get(DataTable.MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName()); - if (routingInstance != null && systemActivitiesCpuTimeNsString != null) { - if (routingInstance.getTableType() == TableType.OFFLINE) { + if (tableType != null && systemActivitiesCpuTimeNsString != null) { + if (tableType == TableType.OFFLINE) { _offlineSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString); } else { _realtimeSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString); @@ -161,8 +198,8 @@ public class ExecutionStatsAggregator { String responseSerializationCpuTimeNsString = metadata.get(DataTable.MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()); - if (routingInstance != null && responseSerializationCpuTimeNsString != null) { - if (routingInstance.getTableType() == TableType.OFFLINE) { + if (tableType != null && responseSerializationCpuTimeNsString != null) { + if (tableType == TableType.OFFLINE) { _offlineResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString); } else { _realtimeResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString); @@ -200,6 +237,22 @@ public class ExecutionStatsAggregator { } _numGroupsLimitReached |= Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName())); + + + String numBlocksString = metadata.get(DataTable.MetadataKey.NUM_BLOCKS.getName()); + if (numBlocksString != null) { + _numBlocks += Long.parseLong(numBlocksString); + } + + String numRowsString = metadata.get(DataTable.MetadataKey.NUM_ROWS.getName()); + if (numBlocksString != null) { + _numRows += Long.parseLong(numRowsString); + } + + String operatorExecutionTimeString = metadata.get(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName()); + if (operatorExecutionTimeString != null) { + _stageExecutionTimeMs += Long.parseLong(operatorExecutionTimeString); + } } public void setStats(BrokerResponseNative brokerResponseNative) { @@ -280,6 +333,17 @@ public class ExecutionStatsAggregator { } } + public void setStageLevelStats(@Nullable String rawTableName, BrokerResponseStats brokerResponseStats, + @Nullable BrokerMetrics brokerMetrics) { + setStats(rawTableName, brokerResponseStats, brokerMetrics); + + brokerResponseStats.setNumBlocks(_numBlocks); + brokerResponseStats.setNumRows(_numRows); + brokerResponseStats.setStageExecutionTimeMs(_stageExecutionTimeMs); + brokerResponseStats.setOperatorIds(_operatorIds); + brokerResponseStats.setTableNames(new ArrayList<>(_tableNames)); + } + private void withNotNullLongMetadata(Map<String, String> metadata, DataTable.MetadataKey key, LongConsumer consumer) { String strValue = metadata.get(key.getName()); if (strValue != null) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java index d9c0104d91..a6bc863457 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java @@ -22,6 +22,7 @@ import com.google.common.base.Joiner; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.core.common.Operator; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; @@ -74,6 +75,7 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>, } if (!_operatorStats.getExecutionStats().isEmpty()) { + _operatorStats.recordSingleStat(DataTable.MetadataKey.OPERATOR_ID.getName(), _operatorId); _operatorStatsMap.put(_operatorId, _operatorStats); } return TransferableBlockUtils.getEndOfStreamTransferableBlock( diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java index 09a8d7ac60..74cefbbc90 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java @@ -22,6 +22,7 @@ import com.google.common.base.Stopwatch; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.operator.utils.OperatorUtils; @@ -38,7 +39,7 @@ public class OperatorStats { private int _numBlock = 0; private int _numRows = 0; - private Map<String, String> _executionStats; + private final Map<String, String> _executionStats; public OperatorStats(long requestId, int stageId, VirtualServerAddress serverAddress, String operatorType) { _stageId = stageId; @@ -65,14 +66,18 @@ public class OperatorStats { _numRows += numRows; } + public void recordSingleStat(String key, String stat) { + _executionStats.put(key, stat); + } + public void recordExecutionStats(Map<String, String> executionStats) { - _executionStats = executionStats; + _executionStats.putAll(executionStats); } public Map<String, String> getExecutionStats() { - _executionStats.put(OperatorUtils.NUM_BLOCKS, String.valueOf(_numBlock)); - _executionStats.put(OperatorUtils.NUM_ROWS, String.valueOf(_numRows)); - _executionStats.put(OperatorUtils.THREAD_EXECUTION_TIME, + _executionStats.putIfAbsent(DataTable.MetadataKey.NUM_BLOCKS.getName(), String.valueOf(_numBlock)); + _executionStats.putIfAbsent(DataTable.MetadataKey.NUM_ROWS.getName(), String.valueOf(_numRows)); + _executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName(), String.valueOf(_executeStopwatch.elapsed(TimeUnit.MILLISECONDS))); return _executionStats; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java index 532befd702..ab79339891 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java @@ -20,10 +20,13 @@ package org.apache.pinot.query.runtime.operator.utils; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Joiner; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.pinot.common.datablock.MetadataBlock; +import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.query.planner.StageMetadata; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.operator.OperatorStats; import org.apache.pinot.spi.utils.JsonUtils; @@ -32,10 +35,6 @@ import org.slf4j.LoggerFactory; public class OperatorUtils { - public static final String NUM_BLOCKS = "numBlocks"; - public static final String NUM_ROWS = "numRows"; - public static final String THREAD_EXECUTION_TIME = "threadExecutionTime"; - private static final Logger LOGGER = LoggerFactory.getLogger(OperatorUtils.class); private static final Map<String, String> OPERATOR_TOKEN_MAPPING = new HashMap<>(); @@ -69,6 +68,13 @@ public class OperatorUtils { return functionName; } + public static void recordTableName(OperatorStats operatorStats, StageMetadata operatorStageMetadata) { + if (!operatorStageMetadata.getScannedTables().isEmpty()) { + operatorStats.recordSingleStat(DataTable.MetadataKey.TABLE.getName(), + Joiner.on("::").join(operatorStageMetadata.getScannedTables())); + } + } + public static String operatorStatsToJson(OperatorStats operatorStats) { try { Map<String, Object> jsonOut = new HashMap<>(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java index 9c7b73a9e0..18511d9ee7 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java @@ -69,7 +69,7 @@ public class QueryDispatcher { public ResultTable submitAndReduce(long requestId, QueryPlan queryPlan, MailboxService<TransferableBlock> mailboxService, long timeoutMs, Map<String, String> queryOptions, - ExecutionStatsAggregator executionStatsAggregator) + Map<Integer, ExecutionStatsAggregator> executionStatsAggregator) throws Exception { // submit all the distributed stages. int reduceStageId = submit(requestId, queryPlan, timeoutMs, queryOptions); @@ -80,7 +80,7 @@ public class QueryDispatcher { reduceNode.getSenderStageId(), reduceStageId, reduceNode.getDataSchema(), new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getMailboxPort(), 0), timeoutMs); List<DataBlock> resultDataBlocks = - reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, executionStatsAggregator); + reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, executionStatsAggregator, queryPlan); return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(), queryPlan.getQueryStageMap().get(0).getDataSchema()); } @@ -128,11 +128,11 @@ public class QueryDispatcher { } public static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator, long timeoutMs) { - return reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, null); + return reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, null, null); } public static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator, long timeoutMs, - @Nullable ExecutionStatsAggregator executionStatsAggregator) { + @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap, QueryPlan queryPlan) { List<DataBlock> resultDataBlocks = new ArrayList<>(); TransferableBlock transferableBlock; long timeoutWatermark = System.nanoTime() + timeoutMs * 1_000_000L; @@ -147,11 +147,19 @@ public class QueryDispatcher { if (transferableBlock.isNoOpBlock()) { continue; } else if (transferableBlock.isEndOfStreamBlock()) { - if (executionStatsAggregator != null) { + if (executionStatsAggregatorMap != null) { for (Map.Entry<String, OperatorStats> entry : transferableBlock.getResultMetadata().entrySet()) { LOGGER.info("Broker Query Execution Stats - OperatorId: {}, OperatorStats: {}", entry.getKey(), OperatorUtils.operatorStatsToJson(entry.getValue())); - executionStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>()); + OperatorStats operatorStats = entry.getValue(); + ExecutionStatsAggregator rootStatsAggregator = executionStatsAggregatorMap.get(0); + ExecutionStatsAggregator stageStatsAggregator = executionStatsAggregatorMap.get(operatorStats.getStageId()); + if (queryPlan != null) { + StageMetadata operatorStageMetadata = queryPlan.getStageMetadataMap().get(operatorStats.getStageId()); + OperatorUtils.recordTableName(operatorStats, operatorStageMetadata); + } + rootStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>()); + stageStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>()); } } return resultDataBlocks; diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java index 6ebc8cdb1c..53be2be4a5 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java @@ -82,7 +82,7 @@ public abstract class QueryRunnerTestBase extends QueryTestSet { // -------------------------------------------------------------------------- // QUERY UTILS // -------------------------------------------------------------------------- - protected List<Object[]> queryRunner(String sql, ExecutionStatsAggregator executionStatsAggregator) { + protected List<Object[]> queryRunner(String sql, Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap) { QueryPlan queryPlan = _queryEnvironment.planQuery(sql); Map<String, String> requestMetadataMap = ImmutableMap.of(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()), @@ -105,11 +105,14 @@ public abstract class QueryRunnerTestBase extends QueryTestSet { _servers.get(serverInstance.getServer()).processQuery(distributedStagePlan, requestMetadataMap); } } + if (executionStatsAggregatorMap != null) { + executionStatsAggregatorMap.put(stageId, new ExecutionStatsAggregator(false)); + } } Preconditions.checkNotNull(mailboxReceiveOperator); return QueryDispatcher.toResultTable( QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator, CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS, - executionStatsAggregator), + executionStatsAggregatorMap, null), queryPlan.getQueryResultFields(), queryPlan.getQueryStageMap().get(0).getDataSchema()).getRows(); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java index d311e9f467..5b57f59ab0 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java @@ -38,7 +38,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.pinot.common.datatable.DataTableFactory; -import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.BrokerResponseNativeV2; +import org.apache.pinot.common.response.broker.BrokerResponseStats; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator; import org.apache.pinot.query.QueryEnvironmentTestBase; @@ -228,19 +229,40 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase { @Test(dataProvider = "testResourceQueryTestCaseProviderWithMetadata") public void testQueryTestCasesWithMetadata(String testCaseName, String sql, String expect, int numSegments) throws Exception { - ExecutionStatsAggregator executionStatsAggregator = new ExecutionStatsAggregator(false); - runQuery(sql, expect, executionStatsAggregator).ifPresent(rows -> { - BrokerResponseNative brokerResponseNative = new BrokerResponseNative(); - executionStatsAggregator.setStats(brokerResponseNative); + Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap = new HashMap<>(); + runQuery(sql, expect, executionStatsAggregatorMap).ifPresent(rows -> { + BrokerResponseNativeV2 brokerResponseNative = new BrokerResponseNativeV2(); + executionStatsAggregatorMap.get(0).setStats(brokerResponseNative); + Assert.assertFalse(executionStatsAggregatorMap.isEmpty()); + for (Integer stageId : executionStatsAggregatorMap.keySet()) { + if (stageId > 0) { + BrokerResponseStats brokerResponseStats = new BrokerResponseStats(); + executionStatsAggregatorMap.get(stageId).setStageLevelStats(null, brokerResponseStats, null); + brokerResponseNative.addStageStat(stageId, brokerResponseStats); + } + } + Assert.assertEquals(brokerResponseNative.getNumSegmentsQueried(), numSegments); + + Map<Integer, BrokerResponseStats> stageIdStats = brokerResponseNative.getStageIdStats(); + for (Integer stageId : stageIdStats.keySet()) { + // check stats only for leaf stage + BrokerResponseStats brokerResponseStats = stageIdStats.get(stageId); + if (!brokerResponseStats.getTableNames().isEmpty()) { + Assert.assertEquals(brokerResponseStats.getTableNames().size(), 1); + String tableName = brokerResponseStats.getTableNames().get(0); + Assert.assertNotNull(_tableToSegmentMap.get(tableName)); + Assert.assertEquals(brokerResponseStats.getNumSegmentsQueried(), _tableToSegmentMap.get(tableName).size()); + } + } }); } private Optional<List<Object[]>> runQuery(String sql, final String except, - ExecutionStatsAggregator executionStatsAggregator) { + Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap) { try { // query pinot - List<Object[]> resultRows = queryRunner(sql, executionStatsAggregator); + List<Object[]> resultRows = queryRunner(sql, executionStatsAggregatorMap); Assert.assertNull(except, "Expected error with message '" + except + "'. But instead rows were returned: " + resultRows.stream() @@ -299,29 +321,30 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase { throws Exception { Map<String, QueryTestCase> testCaseMap = getTestCases(); List<Object[]> providerContent = new ArrayList<>(); + Set<String> validTestCases = new HashSet<>(); + validTestCases.add("basic_test"); + validTestCases.add("framework_test"); + for (Map.Entry<String, QueryTestCase> testCaseEntry : testCaseMap.entrySet()) { String testCaseName = testCaseEntry.getKey(); + if (!validTestCases.contains(testCaseName)) { + continue; + } + if (testCaseEntry.getValue()._ignored) { continue; } List<QueryTestCase.Query> queryCases = testCaseEntry.getValue()._queries; for (QueryTestCase.Query queryCase : queryCases) { + if (queryCase._ignored) { continue; } - if (queryCase._outputs != null) { + if (queryCase._outputs == null) { String sql = replaceTableName(testCaseName, queryCase._sql); - if (!sql.contains("basic_test")) { - continue; - } - List<List<Object>> orgRows = queryCase._outputs; - List<Object[]> expectedRows = new ArrayList<>(orgRows.size()); - for (List<Object> objs : orgRows) { - expectedRows.add(objs.toArray()); - } int segmentCount = 0; for (String tableName : testCaseEntry.getValue()._tables.keySet()) { segmentCount += @@ -401,6 +424,7 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase { { HashSet<String> hashSet = new HashSet<>(testCaseMap.keySet()); hashSet.retainAll(testCases.keySet()); + if (!hashSet.isEmpty()) { throw new IllegalArgumentException("testCase already exist for the following names: " + hashSet); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org