This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f28525b417 Add operator level stats to response when tracing is 
enabled (#10364)
f28525b417 is described below

commit f28525b4176b39c45bf326bd8ecc6234cee1e027
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Mon Mar 6 12:05:42 2023 +0530

    Add operator level stats to response when tracing is enabled (#10364)
    
    * Add operator level stats to response when tracing is enabled
    
    * Add tests for operatorStats on tracing
---
 .../MultiStageBrokerRequestHandler.java            |  6 ++++-
 .../response/broker/BrokerResponseNativeV2.java    |  2 +-
 .../response/broker/BrokerResponseStats.java       | 18 +++++++------
 .../api/resources/PinotQueryResource.java          |  6 ++---
 .../query/reduce/ExecutionStatsAggregator.java     | 12 ++++++---
 .../pinot/query/runtime/QueryRunnerTestBase.java   |  4 +--
 .../runtime/queries/ResourceBasedQueriesTest.java  | 30 ++++++++++++++++++----
 7 files changed, 54 insertions(+), 24 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 17f018f047..9ebc6ea6b4 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -169,10 +169,14 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
       return new 
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
 e));
     }
 
+    boolean traceEnabled = Boolean.parseBoolean(
+        request.has(CommonConstants.Broker.Request.TRACE) ? 
request.get(CommonConstants.Broker.Request.TRACE).asText()
+            : "false");
+
     ResultTable queryResults;
     Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>();
     for (Integer stageId: queryPlan.getStageMetadataMap().keySet()) {
-      stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(false));
+      stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(traceEnabled));
     }
 
     try {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 79605773d7..5bf631e129 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -81,7 +81,7 @@ public class BrokerResponseNativeV2 extends 
BrokerResponseNative {
   }
 
   public void addStageStat(Integer stageId, BrokerResponseStats 
brokerResponseStats) {
-    if (!brokerResponseStats.getOperatorIds().isEmpty()) {
+    if (!brokerResponseStats.getOperatorStats().isEmpty()) {
       _stageIdStats.put(stageId, brokerResponseStats);
     }
   }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
index 60d7ab4813..83cbd16f3c 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
@@ -23,7 +23,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.pinot.spi.utils.JsonUtils;
 
 
@@ -38,14 +40,14 @@ import org.apache.pinot.spi.utils.JsonUtils;
     "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", 
"realtimeThreadCpuTimeNs",
     "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", 
"offlineResponseSerializationCpuTimeNs",
     "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", 
"realtimeTotalCpuTimeNs",
-    "traceInfo", "operatorIds", "tableNames"})
+    "traceInfo", "operatorStats", "tableNames"})
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
 public class BrokerResponseStats extends BrokerResponseNative {
 
   private int _numBlocks = 0;
   private int _numRows = 0;
   private long _stageExecutionTimeMs = 0;
-  private List<String> _operatorIds = new ArrayList<>();
+  private Map<String, Map<String, String>> _operatorStats = new HashMap<>();
   private List<String> _tableNames = new ArrayList<>();
 
   @Override
@@ -88,14 +90,14 @@ public class BrokerResponseStats extends 
BrokerResponseNative {
     return JsonUtils.objectToString(this);
   }
 
-  @JsonProperty("operatorIds")
-  public List<String> getOperatorIds() {
-    return _operatorIds;
+  @JsonProperty("operatorStats")
+  public Map<String, Map<String, String>> getOperatorStats() {
+    return _operatorStats;
   }
 
-  @JsonProperty("operatorIds")
-  public void setOperatorIds(List<String> operatorIds) {
-    _operatorIds = operatorIds;
+  @JsonProperty("operatorStats")
+  public void setOperatorStats(Map<String, Map<String, String>> operatorStats) 
{
+    _operatorStats = operatorStats;
   }
 
   @JsonProperty("tableNames")
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index 6dd546c778..856632350e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -138,7 +138,7 @@ public class PinotQueryResource {
     if 
(Boolean.parseBoolean(options.get(QueryOptionKey.USE_MULTISTAGE_ENGINE))) {
       if 
(_controllerConf.getProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED,
           CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) {
-        return getMultiStageQueryResponse(sqlQuery, queryOptions, httpHeaders, 
endpointUrl);
+        return getMultiStageQueryResponse(sqlQuery, queryOptions, httpHeaders, 
endpointUrl, traceEnabled);
       } else {
         throw new UnsupportedOperationException("V2 Multi-Stage query engine 
not enabled. "
             + "Please see https://docs.pinot.apache.org/ for instruction to 
enable V2 engine.");
@@ -161,7 +161,7 @@ public class PinotQueryResource {
   }
 
   private String getMultiStageQueryResponse(String query, String queryOptions, 
HttpHeaders httpHeaders,
-      String endpointUrl) {
+      String endpointUrl, String traceEnabled) {
 
     // Validate data access
     // we don't have a cross table access control rule so only ADMIN can make 
request to multi-stage engine.
@@ -185,7 +185,7 @@ public class PinotQueryResource {
 
     // Send query to a random broker.
     String instanceId = instanceIds.get(RANDOM.nextInt(instanceIds.size()));
-    return sendRequestToBroker(query, instanceId, "false", queryOptions, 
httpHeaders);
+    return sendRequestToBroker(query, instanceId, traceEnabled, queryOptions, 
httpHeaders);
   }
 
   private String getQueryResponse(String query, @Nullable SqlNode sqlNode, 
String traceEnabled, String queryOptions,
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
index 889b859c29..2faefee584 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
@@ -43,7 +43,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 public class ExecutionStatsAggregator {
   private final List<QueryProcessingException> _processingExceptions = new 
ArrayList<>();
-  private final List<String> _operatorIds = new ArrayList<>();
+  private final Map<String, Map<String, String>> _operatorStats = new 
HashMap<>();
   private final Set<String> _tableNames = new HashSet<>();
   private final Map<String, String> _traceInfo = new HashMap<>();
   private final boolean _enableTrace;
@@ -89,7 +89,7 @@ public class ExecutionStatsAggregator {
   public synchronized void aggregate(@Nullable ServerRoutingInstance 
routingInstance, Map<String, String> metadata,
       Map<Integer, String> exceptions) {
     // Reduce on trace info.
-    if (_enableTrace) {
+    if (_enableTrace && 
metadata.containsKey(DataTable.MetadataKey.TRACE_INFO.getName())) {
       _traceInfo.put(routingInstance.getShortName(), 
metadata.get(DataTable.MetadataKey.TRACE_INFO.getName()));
     }
 
@@ -116,7 +116,11 @@ public class ExecutionStatsAggregator {
 
     String operatorId = 
metadata.get(DataTable.MetadataKey.OPERATOR_ID.getName());
     if (operatorId != null) {
-      _operatorIds.add(operatorId);
+      if (_enableTrace) {
+        _operatorStats.put(operatorId, metadata);
+      } else {
+        _operatorStats.put(operatorId, new HashMap<>());
+      }
     }
 
     // Reduce on exceptions.
@@ -340,7 +344,7 @@ public class ExecutionStatsAggregator {
     brokerResponseStats.setNumBlocks(_numBlocks);
     brokerResponseStats.setNumRows(_numRows);
     brokerResponseStats.setStageExecutionTimeMs(_stageExecutionTimeMs);
-    brokerResponseStats.setOperatorIds(_operatorIds);
+    brokerResponseStats.setOperatorStats(_operatorStats);
     brokerResponseStats.setTableNames(new ArrayList<>(_tableNames));
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 8c6ed98ee8..9cc6cbc539 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -106,13 +106,13 @@ public abstract class QueryRunnerTestBase extends 
QueryTestSet {
         }
       }
       if (executionStatsAggregatorMap != null) {
-        executionStatsAggregatorMap.put(stageId, new 
ExecutionStatsAggregator(false));
+        executionStatsAggregatorMap.put(stageId, new 
ExecutionStatsAggregator(true));
       }
     }
     Preconditions.checkNotNull(mailboxReceiveOperator);
     return QueryDispatcher.toResultTable(
         QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator, 
CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS,
-            executionStatsAggregatorMap, null),
+            executionStatsAggregatorMap, queryPlan),
         queryPlan.getQueryResultFields(), 
queryPlan.getQueryStageMap().get(0).getDataSchema()).getRows();
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 5b57f59ab0..1169e66780 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -37,6 +37,7 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
 import org.apache.pinot.common.response.broker.BrokerResponseStats;
@@ -248,11 +249,30 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
       for (Integer stageId : stageIdStats.keySet()) {
         // check stats only for leaf stage
         BrokerResponseStats brokerResponseStats = stageIdStats.get(stageId);
-        if (!brokerResponseStats.getTableNames().isEmpty()) {
-          Assert.assertEquals(brokerResponseStats.getTableNames().size(), 1);
-          String tableName = brokerResponseStats.getTableNames().get(0);
-          Assert.assertNotNull(_tableToSegmentMap.get(tableName));
-          Assert.assertEquals(brokerResponseStats.getNumSegmentsQueried(), 
_tableToSegmentMap.get(tableName).size());
+
+        if (brokerResponseStats.getTableNames().isEmpty()) {
+          continue;
+        }
+
+        String tableName = brokerResponseStats.getTableNames().get(0);
+        Assert.assertEquals(brokerResponseStats.getTableNames().size(), 1);
+
+        TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
+        if (tableType == null) {
+          tableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+        }
+
+        Assert.assertNotNull(_tableToSegmentMap.get(tableName));
+        Assert.assertEquals(brokerResponseStats.getNumSegmentsQueried(), 
_tableToSegmentMap.get(tableName).size());
+
+        Assert.assertFalse(brokerResponseStats.getOperatorStats().isEmpty());
+        Map<String, Map<String, String>> operatorStats = 
brokerResponseStats.getOperatorStats();
+        for (Map.Entry<String, Map<String, String>> entry : 
operatorStats.entrySet()) {
+          if (entry.getKey().contains("LEAF_STAGE")) {
+            
Assert.assertNotNull(entry.getValue().get(DataTable.MetadataKey.NUM_SEGMENTS_QUERIED.getName()));
+          } else {
+            
Assert.assertNotNull(entry.getValue().get(DataTable.MetadataKey.NUM_BLOCKS.getName()));
+          }
         }
       }
     });


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to