This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new f28525b417 Add operator level stats to response when tracing is enabled (#10364) f28525b417 is described below commit f28525b4176b39c45bf326bd8ecc6234cee1e027 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Mon Mar 6 12:05:42 2023 +0530 Add operator level stats to response when tracing is enabled (#10364) * Add operator level stats to response when tracing is enabled * Add tests for operatorStats on tracing --- .../MultiStageBrokerRequestHandler.java | 6 ++++- .../response/broker/BrokerResponseNativeV2.java | 2 +- .../response/broker/BrokerResponseStats.java | 18 +++++++------ .../api/resources/PinotQueryResource.java | 6 ++--- .../query/reduce/ExecutionStatsAggregator.java | 12 ++++++--- .../pinot/query/runtime/QueryRunnerTestBase.java | 4 +-- .../runtime/queries/ResourceBasedQueriesTest.java | 30 ++++++++++++++++++---- 7 files changed, 54 insertions(+), 24 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 17f018f047..9ebc6ea6b4 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -169,10 +169,14 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e)); } + boolean traceEnabled = Boolean.parseBoolean( + request.has(CommonConstants.Broker.Request.TRACE) ? request.get(CommonConstants.Broker.Request.TRACE).asText() + : "false"); + ResultTable queryResults; Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>(); for (Integer stageId: queryPlan.getStageMetadataMap().keySet()) { - stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(false)); + stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(traceEnabled)); } try { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java index 79605773d7..5bf631e129 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java @@ -81,7 +81,7 @@ public class BrokerResponseNativeV2 extends BrokerResponseNative { } public void addStageStat(Integer stageId, BrokerResponseStats brokerResponseStats) { - if (!brokerResponseStats.getOperatorIds().isEmpty()) { + if (!brokerResponseStats.getOperatorStats().isEmpty()) { _stageIdStats.put(stageId, brokerResponseStats); } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java index 60d7ab4813..83cbd16f3c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java @@ -23,7 +23,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyOrder; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.pinot.spi.utils.JsonUtils; @@ -38,14 +40,14 @@ import org.apache.pinot.spi.utils.JsonUtils; "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", - "traceInfo", "operatorIds", "tableNames"}) + "traceInfo", "operatorStats", "tableNames"}) @JsonInclude(JsonInclude.Include.NON_DEFAULT) public class BrokerResponseStats extends BrokerResponseNative { private int _numBlocks = 0; private int _numRows = 0; private long _stageExecutionTimeMs = 0; - private List<String> _operatorIds = new ArrayList<>(); + private Map<String, Map<String, String>> _operatorStats = new HashMap<>(); private List<String> _tableNames = new ArrayList<>(); @Override @@ -88,14 +90,14 @@ public class BrokerResponseStats extends BrokerResponseNative { return JsonUtils.objectToString(this); } - @JsonProperty("operatorIds") - public List<String> getOperatorIds() { - return _operatorIds; + @JsonProperty("operatorStats") + public Map<String, Map<String, String>> getOperatorStats() { + return _operatorStats; } - @JsonProperty("operatorIds") - public void setOperatorIds(List<String> operatorIds) { - _operatorIds = operatorIds; + @JsonProperty("operatorStats") + public void setOperatorStats(Map<String, Map<String, String>> operatorStats) { + _operatorStats = operatorStats; } @JsonProperty("tableNames") diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java index 6dd546c778..856632350e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java @@ -138,7 +138,7 @@ public class PinotQueryResource { if (Boolean.parseBoolean(options.get(QueryOptionKey.USE_MULTISTAGE_ENGINE))) { if (_controllerConf.getProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) { - return getMultiStageQueryResponse(sqlQuery, queryOptions, httpHeaders, endpointUrl); + return getMultiStageQueryResponse(sqlQuery, queryOptions, httpHeaders, endpointUrl, traceEnabled); } else { throw new UnsupportedOperationException("V2 Multi-Stage query engine not enabled. " + "Please see https://docs.pinot.apache.org/ for instruction to enable V2 engine."); @@ -161,7 +161,7 @@ public class PinotQueryResource { } private String getMultiStageQueryResponse(String query, String queryOptions, HttpHeaders httpHeaders, - String endpointUrl) { + String endpointUrl, String traceEnabled) { // Validate data access // we don't have a cross table access control rule so only ADMIN can make request to multi-stage engine. @@ -185,7 +185,7 @@ public class PinotQueryResource { // Send query to a random broker. String instanceId = instanceIds.get(RANDOM.nextInt(instanceIds.size())); - return sendRequestToBroker(query, instanceId, "false", queryOptions, httpHeaders); + return sendRequestToBroker(query, instanceId, traceEnabled, queryOptions, httpHeaders); } private String getQueryResponse(String query, @Nullable SqlNode sqlNode, String traceEnabled, String queryOptions, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java index 889b859c29..2faefee584 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java @@ -43,7 +43,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder; public class ExecutionStatsAggregator { private final List<QueryProcessingException> _processingExceptions = new ArrayList<>(); - private final List<String> _operatorIds = new ArrayList<>(); + private final Map<String, Map<String, String>> _operatorStats = new HashMap<>(); private final Set<String> _tableNames = new HashSet<>(); private final Map<String, String> _traceInfo = new HashMap<>(); private final boolean _enableTrace; @@ -89,7 +89,7 @@ public class ExecutionStatsAggregator { public synchronized void aggregate(@Nullable ServerRoutingInstance routingInstance, Map<String, String> metadata, Map<Integer, String> exceptions) { // Reduce on trace info. - if (_enableTrace) { + if (_enableTrace && metadata.containsKey(DataTable.MetadataKey.TRACE_INFO.getName())) { _traceInfo.put(routingInstance.getShortName(), metadata.get(DataTable.MetadataKey.TRACE_INFO.getName())); } @@ -116,7 +116,11 @@ public class ExecutionStatsAggregator { String operatorId = metadata.get(DataTable.MetadataKey.OPERATOR_ID.getName()); if (operatorId != null) { - _operatorIds.add(operatorId); + if (_enableTrace) { + _operatorStats.put(operatorId, metadata); + } else { + _operatorStats.put(operatorId, new HashMap<>()); + } } // Reduce on exceptions. @@ -340,7 +344,7 @@ public class ExecutionStatsAggregator { brokerResponseStats.setNumBlocks(_numBlocks); brokerResponseStats.setNumRows(_numRows); brokerResponseStats.setStageExecutionTimeMs(_stageExecutionTimeMs); - brokerResponseStats.setOperatorIds(_operatorIds); + brokerResponseStats.setOperatorStats(_operatorStats); brokerResponseStats.setTableNames(new ArrayList<>(_tableNames)); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java index 8c6ed98ee8..9cc6cbc539 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java @@ -106,13 +106,13 @@ public abstract class QueryRunnerTestBase extends QueryTestSet { } } if (executionStatsAggregatorMap != null) { - executionStatsAggregatorMap.put(stageId, new ExecutionStatsAggregator(false)); + executionStatsAggregatorMap.put(stageId, new ExecutionStatsAggregator(true)); } } Preconditions.checkNotNull(mailboxReceiveOperator); return QueryDispatcher.toResultTable( QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator, CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS, - executionStatsAggregatorMap, null), + executionStatsAggregatorMap, queryPlan), queryPlan.getQueryResultFields(), queryPlan.getQueryStageMap().get(0).getDataSchema()).getRows(); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java index 5b57f59ab0..1169e66780 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java @@ -37,6 +37,7 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTableFactory; import org.apache.pinot.common.response.broker.BrokerResponseNativeV2; import org.apache.pinot.common.response.broker.BrokerResponseStats; @@ -248,11 +249,30 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase { for (Integer stageId : stageIdStats.keySet()) { // check stats only for leaf stage BrokerResponseStats brokerResponseStats = stageIdStats.get(stageId); - if (!brokerResponseStats.getTableNames().isEmpty()) { - Assert.assertEquals(brokerResponseStats.getTableNames().size(), 1); - String tableName = brokerResponseStats.getTableNames().get(0); - Assert.assertNotNull(_tableToSegmentMap.get(tableName)); - Assert.assertEquals(brokerResponseStats.getNumSegmentsQueried(), _tableToSegmentMap.get(tableName).size()); + + if (brokerResponseStats.getTableNames().isEmpty()) { + continue; + } + + String tableName = brokerResponseStats.getTableNames().get(0); + Assert.assertEquals(brokerResponseStats.getTableNames().size(), 1); + + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + if (tableType == null) { + tableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName); + } + + Assert.assertNotNull(_tableToSegmentMap.get(tableName)); + Assert.assertEquals(brokerResponseStats.getNumSegmentsQueried(), _tableToSegmentMap.get(tableName).size()); + + Assert.assertFalse(brokerResponseStats.getOperatorStats().isEmpty()); + Map<String, Map<String, String>> operatorStats = brokerResponseStats.getOperatorStats(); + for (Map.Entry<String, Map<String, String>> entry : operatorStats.entrySet()) { + if (entry.getKey().contains("LEAF_STAGE")) { + Assert.assertNotNull(entry.getValue().get(DataTable.MetadataKey.NUM_SEGMENTS_QUERIED.getName())); + } else { + Assert.assertNotNull(entry.getValue().get(DataTable.MetadataKey.NUM_BLOCKS.getName())); + } } } }); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org