This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c5dd6df27c Add Statistics grouped at Stage ID level in the V2 Engine 
Response (#10337)
c5dd6df27c is described below

commit c5dd6df27c384b523c77adcb66956bf0e5c37251
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Tue Feb 28 15:49:46 2023 +0530

    Add Statistics grouped at Stage ID level in the V2 Engine Response (#10337)
    
    * WIP: aggregate stats on stage level for response
    
    * Make response backward compatible
    
    * Add new metadata keys to enum and replace hardcoded values; also add 
table names to the stats
    
    * Rename operatorExecutionTime to stageExecutionTime for correct 
understanding
    
    * Remove sysout
    
    * Remove duplicate code inside BrokerResponseStats class
    
    * Remove unused constants from OperatorUtils and fix formatting
    
    * Add test for stage level stats as well as BrokerResponseNativeV2
    
    * Add followup TODOs and move method to utils class
    
    ---------
    
    Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local>
---
 .../MultiStageBrokerRequestHandler.java            |  36 ++++++-
 .../apache/pinot/common/datatable/DataTable.java   |  15 ++-
 .../response/broker/BrokerResponseNativeV2.java    |  93 +++++++++++++++++
 .../response/broker/BrokerResponseStats.java       | 110 +++++++++++++++++++++
 .../query/reduce/ExecutionStatsAggregator.java     |  76 ++++++++++++--
 .../query/runtime/operator/MultiStageOperator.java |   2 +
 .../query/runtime/operator/OperatorStats.java      |  15 ++-
 .../runtime/operator/utils/OperatorUtils.java      |  14 ++-
 .../pinot/query/service/QueryDispatcher.java       |  20 ++--
 .../pinot/query/runtime/QueryRunnerTestBase.java   |   7 +-
 .../runtime/queries/ResourceBasedQueriesTest.java  |  56 ++++++++---
 11 files changed, 395 insertions(+), 49 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 63c1e8f9ad..e2dae3b75f 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -20,6 +20,7 @@ package org.apache.pinot.broker.requesthandler;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -37,6 +38,8 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
+import org.apache.pinot.common.response.broker.BrokerResponseStats;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
@@ -58,6 +61,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -133,7 +137,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     return handleRequest(requestId, query, sqlNodeAndOptions, request, 
requesterIdentity, requestContext);
   }
 
-  private BrokerResponseNative handleRequest(long requestId, String query,
+  private BrokerResponse handleRequest(long requestId, String query,
       @Nullable SqlNodeAndOptions sqlNodeAndOptions, JsonNode request, 
@Nullable RequesterIdentity requesterIdentity,
       RequestContext requestContext)
       throws Exception {
@@ -166,16 +170,20 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     }
 
     ResultTable queryResults;
-    ExecutionStatsAggregator executionStatsAggregator = new 
ExecutionStatsAggregator(false);
+    Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>();
+    for (Integer stageId: queryPlan.getStageMetadataMap().keySet()) {
+      stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(false));
+    }
+
     try {
       queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan, 
_mailboxService, queryTimeoutMs,
-          sqlNodeAndOptions.getOptions(), executionStatsAggregator);
+          sqlNodeAndOptions.getOptions(), stageIdStatsMap);
     } catch (Exception e) {
       LOGGER.info("query execution failed", e);
       return new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
     }
 
-    BrokerResponseNative brokerResponse = new BrokerResponseNative();
+    BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
     long executionEndTimeNs = System.nanoTime();
 
     // Set total query processing time
@@ -184,7 +192,25 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     brokerResponse.setTimeUsedMs(totalTimeMs);
     brokerResponse.setResultTable(queryResults);
 
-    executionStatsAggregator.setStats(brokerResponse);
+    for (Map.Entry<Integer, ExecutionStatsAggregator> entry : 
stageIdStatsMap.entrySet()) {
+      if (entry.getKey() == 0) {
+        // Root stats are aggregated and added separately to broker response 
for backward compatibility
+        entry.getValue().setStats(brokerResponse);
+        continue;
+      }
+
+      BrokerResponseStats brokerResponseStats = new BrokerResponseStats();
+      List<String> tableNames = 
queryPlan.getStageMetadataMap().get(entry.getKey()).getScannedTables();
+      if (tableNames.size() > 0) {
+        //TODO: Only using first table to assign broker metrics
+        // find a way to split metrics in case of multiple table
+        String rawTableName = 
TableNameBuilder.extractRawTableName(tableNames.get(0));
+        entry.getValue().setStageLevelStats(rawTableName, brokerResponseStats, 
_brokerMetrics);
+      } else {
+        entry.getValue().setStageLevelStats(null, brokerResponseStats, null);
+      }
+      brokerResponse.addStageStat(entry.getKey(), brokerResponseStats);
+    }
 
     requestContext.setQueryProcessingTime(totalTimeMs);
     augmentStatistics(requestContext, brokerResponse);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
index 9bac9706a7..ea996cba6d 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
@@ -102,7 +102,7 @@ public interface DataTable {
    */
   enum MetadataKey {
     UNKNOWN(0, "unknown", MetadataValueType.STRING),
-    TABLE(1, "table", MetadataValueType.STRING), // NOTE: this key is only 
used in PrioritySchedulerTest
+    TABLE(1, "table", MetadataValueType.STRING),
     NUM_DOCS_SCANNED(2, "numDocsScanned", MetadataValueType.LONG),
     NUM_ENTRIES_SCANNED_IN_FILTER(3, "numEntriesScannedInFilter", 
MetadataValueType.LONG),
     NUM_ENTRIES_SCANNED_POST_FILTER(4, "numEntriesScannedPostFilter", 
MetadataValueType.LONG),
@@ -110,8 +110,6 @@ public interface DataTable {
     NUM_SEGMENTS_PROCESSED(6, "numSegmentsProcessed", MetadataValueType.INT),
     NUM_SEGMENTS_MATCHED(7, "numSegmentsMatched", MetadataValueType.INT),
     NUM_CONSUMING_SEGMENTS_QUERIED(8, "numConsumingSegmentsQueried", 
MetadataValueType.INT),
-    NUM_CONSUMING_SEGMENTS_PROCESSED(26, "numConsumingSegmentsProcessed", 
MetadataValueType.INT),
-    NUM_CONSUMING_SEGMENTS_MATCHED(27, "numConsumingSegmentsMatched", 
MetadataValueType.INT),
     MIN_CONSUMING_FRESHNESS_TIME_MS(9, "minConsumingFreshnessTimeMs", 
MetadataValueType.LONG),
     TOTAL_DOCS(10, "totalDocs", MetadataValueType.LONG),
     NUM_GROUPS_LIMIT_REACHED(11, "numGroupsLimitReached", 
MetadataValueType.STRING),
@@ -128,11 +126,17 @@ public interface DataTable {
     NUM_SEGMENTS_PRUNED_BY_LIMIT(22, "numSegmentsPrunedByLimit", 
MetadataValueType.INT),
     NUM_SEGMENTS_PRUNED_BY_VALUE(23, "numSegmentsPrunedByValue", 
MetadataValueType.INT),
     EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS(24, 
"explainPlanNumEmptyFilterSegments", MetadataValueType.INT),
-    EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS(25, 
"explainPlanNumMatchAllFilterSegments", MetadataValueType.INT);
+    EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS(25, 
"explainPlanNumMatchAllFilterSegments", MetadataValueType.INT),
+    NUM_CONSUMING_SEGMENTS_PROCESSED(26, "numConsumingSegmentsProcessed", 
MetadataValueType.INT),
+    NUM_CONSUMING_SEGMENTS_MATCHED(27, "numConsumingSegmentsMatched", 
MetadataValueType.INT),
+    NUM_BLOCKS(28, "numBlocks", MetadataValueType.INT),
+    NUM_ROWS(29, "numRows", MetadataValueType.INT),
+    OPERATOR_EXECUTION_TIME_MS(30, "operatorExecutionTimeMs", 
MetadataValueType.LONG),
+    OPERATOR_ID(31, "operatorId", MetadataValueType.STRING);
 
     // We keep this constant to track the max id added so far for backward 
compatibility.
     // Increase it when adding new keys, but NEVER DECREASE IT!!!
-    private static final int MAX_ID = 27;
+    private static final int MAX_ID = 31;
 
     private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new 
MetadataKey[MAX_ID + 1];
     private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new 
HashMap<>();
@@ -186,6 +190,7 @@ public interface DataTable {
         int id = key.getId();
         Preconditions.checkArgument(id >= 0 && id <= MAX_ID,
             "Invalid id: %s for MetadataKey: %s, must be in the range of [0, 
MAX_ID(%s)]", id, key, MAX_ID);
+
         Preconditions.checkArgument(ID_TO_ENUM_KEY_MAP[id] == null,
             "Duplicate id: %s defined for MetadataKey: %s and %s", id, 
ID_TO_ENUM_KEY_MAP[id], key);
         ID_TO_ENUM_KEY_MAP[id] = key;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
new file mode 100644
index 0000000000..79605773d7
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.broker;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+/**
+ * This class implements pinot-broker's response format for any given query.
+ * All fields either primitive data types, or native objects (as opposed to 
JSONObjects).
+ *
+ * Supports serialization via JSON.
+ */
+@JsonPropertyOrder({
+    "resultTable", "stageStats", "exceptions", "numServersQueried", 
"numServersResponded", "numSegmentsQueried",
+    "numSegmentsProcessed", "numSegmentsMatched", 
"numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
+    "numConsumingSegmentsMatched", "numDocsScanned", 
"numEntriesScannedInFilter", "numEntriesScannedPostFilter",
+    "numGroupsLimitReached", "totalDocs", "timeUsedMs", 
"offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
+    "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", 
"offlineResponseSerializationCpuTimeNs",
+    "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", 
"realtimeTotalCpuTimeNs", "segmentStatistics",
+    "traceInfo"
+})
+public class BrokerResponseNativeV2 extends BrokerResponseNative {
+  private final Map<Integer, BrokerResponseStats> _stageIdStats = new 
HashMap<>();
+
+  public BrokerResponseNativeV2() {
+  }
+
+  public BrokerResponseNativeV2(ProcessingException exception) {
+    super(exception);
+  }
+
+  public BrokerResponseNativeV2(List<ProcessingException> exceptions) {
+    super(exceptions);
+  }
+
+  /** Generate EXPLAIN PLAN output when queries are evaluated by Broker 
without going to the Server. */
+  private static BrokerResponseNativeV2 getBrokerResponseExplainPlanOutput() {
+    BrokerResponseNativeV2 brokerResponse = BrokerResponseNativeV2.empty();
+    List<Object[]> rows = new ArrayList<>();
+    rows.add(new Object[]{"BROKER_EVALUATE", 0, -1});
+    brokerResponse.setResultTable(new 
ResultTable(DataSchema.EXPLAIN_RESULT_SCHEMA, rows));
+    return brokerResponse;
+  }
+
+  /**
+   * Get a new empty {@link BrokerResponseNativeV2}.
+   */
+  public static BrokerResponseNativeV2 empty() {
+    return new BrokerResponseNativeV2();
+  }
+
+  public static BrokerResponseNativeV2 fromJsonString(String jsonString)
+      throws IOException {
+    return JsonUtils.stringToObject(jsonString, BrokerResponseNativeV2.class);
+  }
+
+  public void addStageStat(Integer stageId, BrokerResponseStats 
brokerResponseStats) {
+    if (!brokerResponseStats.getOperatorIds().isEmpty()) {
+      _stageIdStats.put(stageId, brokerResponseStats);
+    }
+  }
+
+  @JsonProperty("stageStats")
+  public Map<Integer, BrokerResponseStats> getStageIdStats() {
+    return _stageIdStats;
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
new file mode 100644
index 0000000000..60d7ab4813
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.broker;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+// TODO: Decouple the execution stats aggregator logic and make it into a util 
that can aggregate 2 values with the
+//  same metadataKey
+// TODO: Replace member fields with a simple map of <MetadataKey, Object>
+// TODO: Add a subStat field, stage level subStats will contain each operator 
stats
+@JsonPropertyOrder({"exceptions", "numBlocks", "numRows", 
"stageExecutionTimeMs", "numServersQueried",
+    "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", 
"numSegmentsMatched",
+    "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", 
"numConsumingSegmentsMatched",
+    "numDocsScanned", "numEntriesScannedInFilter", 
"numEntriesScannedPostFilter", "numGroupsLimitReached",
+    "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", 
"realtimeThreadCpuTimeNs",
+    "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", 
"offlineResponseSerializationCpuTimeNs",
+    "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", 
"realtimeTotalCpuTimeNs",
+    "traceInfo", "operatorIds", "tableNames"})
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class BrokerResponseStats extends BrokerResponseNative {
+
+  private int _numBlocks = 0;
+  private int _numRows = 0;
+  private long _stageExecutionTimeMs = 0;
+  private List<String> _operatorIds = new ArrayList<>();
+  private List<String> _tableNames = new ArrayList<>();
+
+  @Override
+  public ResultTable getResultTable() {
+    return null;
+  }
+
+  @JsonProperty("numBlocks")
+  public int getNumBlocks() {
+    return _numBlocks;
+  }
+
+  @JsonProperty("numBlocks")
+  public void setNumBlocks(int numBlocks) {
+    _numBlocks = numBlocks;
+  }
+
+  @JsonProperty("numRows")
+  public int getNumRows() {
+    return _numRows;
+  }
+
+  @JsonProperty("numRows")
+  public void setNumRows(int numRows) {
+    _numRows = numRows;
+  }
+
+  @JsonProperty("stageExecutionTimeMs")
+  public long getStageExecutionTimeMs() {
+    return _stageExecutionTimeMs;
+  }
+
+  @JsonProperty("stageExecutionTimeMs")
+  public void setStageExecutionTimeMs(long stageExecutionTimeMs) {
+    _stageExecutionTimeMs = stageExecutionTimeMs;
+  }
+
+  public String toJsonString()
+      throws IOException {
+    return JsonUtils.objectToString(this);
+  }
+
+  @JsonProperty("operatorIds")
+  public List<String> getOperatorIds() {
+    return _operatorIds;
+  }
+
+  @JsonProperty("operatorIds")
+  public void setOperatorIds(List<String> operatorIds) {
+    _operatorIds = operatorIds;
+  }
+
+  @JsonProperty("tableNames")
+  public List<String> getTableNames() {
+    return _tableNames;
+  }
+
+  @JsonProperty("tableNames")
+  public void setTableNames(List<String> tableNames) {
+    _tableNames = tableNames;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
index 0c93252c51..889b859c29 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
@@ -19,24 +19,32 @@
 package org.apache.pinot.core.query.reduce;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.LongConsumer;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.BrokerTimer;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.BrokerResponseStats;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
 public class ExecutionStatsAggregator {
   private final List<QueryProcessingException> _processingExceptions = new 
ArrayList<>();
+  private final List<String> _operatorIds = new ArrayList<>();
+  private final Set<String> _tableNames = new HashSet<>();
   private final Map<String, String> _traceInfo = new HashMap<>();
   private final boolean _enableTrace;
 
@@ -66,6 +74,9 @@ public class ExecutionStatsAggregator {
   private long _explainPlanNumEmptyFilterSegments = 0L;
   private long _explainPlanNumMatchAllFilterSegments = 0L;
   private boolean _numGroupsLimitReached = false;
+  private int _numBlocks = 0;
+  private int _numRows = 0;
+  private long _stageExecutionTimeMs = 0;
 
   public ExecutionStatsAggregator(boolean enableTrace) {
     _enableTrace = enableTrace;
@@ -82,6 +93,32 @@ public class ExecutionStatsAggregator {
       _traceInfo.put(routingInstance.getShortName(), 
metadata.get(DataTable.MetadataKey.TRACE_INFO.getName()));
     }
 
+    String tableNamesStr = metadata.get(DataTable.MetadataKey.TABLE.getName());
+    String tableName = null;
+
+    if (tableNamesStr != null) {
+      List<String> tableNames = 
Arrays.stream(tableNamesStr.split("::")).collect(Collectors.toList());
+      _tableNames.addAll(tableNames);
+
+      //TODO: Decide a strategy to split stageLevel stats across tables for 
brokerMetrics
+      // assigning everything to the first table only for now
+      tableName = tableNames.get(0);
+    }
+
+    TableType tableType = null;
+    if (routingInstance != null) {
+      tableType = routingInstance.getTableType();
+    } else if (tableName != null) {
+      tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+    } else {
+      tableType = null;
+    }
+
+    String operatorId = 
metadata.get(DataTable.MetadataKey.OPERATOR_ID.getName());
+    if (operatorId != null) {
+      _operatorIds.add(operatorId);
+    }
+
     // Reduce on exceptions.
     for (int key : exceptions.keySet()) {
       _processingExceptions.add(new QueryProcessingException(key, 
exceptions.get(key)));
@@ -141,8 +178,8 @@ public class ExecutionStatsAggregator {
     }
 
     String threadCpuTimeNsString = 
metadata.get(DataTable.MetadataKey.THREAD_CPU_TIME_NS.getName());
-    if (routingInstance != null && threadCpuTimeNsString != null) {
-      if (routingInstance.getTableType() == TableType.OFFLINE) {
+    if (tableType != null && threadCpuTimeNsString != null) {
+      if (tableType == TableType.OFFLINE) {
         _offlineThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
       } else {
         _realtimeThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
@@ -151,8 +188,8 @@ public class ExecutionStatsAggregator {
 
     String systemActivitiesCpuTimeNsString =
         
metadata.get(DataTable.MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName());
-    if (routingInstance != null && systemActivitiesCpuTimeNsString != null) {
-      if (routingInstance.getTableType() == TableType.OFFLINE) {
+    if (tableType != null && systemActivitiesCpuTimeNsString != null) {
+      if (tableType == TableType.OFFLINE) {
         _offlineSystemActivitiesCpuTimeNs += 
Long.parseLong(systemActivitiesCpuTimeNsString);
       } else {
         _realtimeSystemActivitiesCpuTimeNs += 
Long.parseLong(systemActivitiesCpuTimeNsString);
@@ -161,8 +198,8 @@ public class ExecutionStatsAggregator {
 
     String responseSerializationCpuTimeNsString =
         metadata.get(DataTable.MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
-    if (routingInstance != null && responseSerializationCpuTimeNsString != 
null) {
-      if (routingInstance.getTableType() == TableType.OFFLINE) {
+    if (tableType != null && responseSerializationCpuTimeNsString != null) {
+      if (tableType == TableType.OFFLINE) {
         _offlineResponseSerializationCpuTimeNs += 
Long.parseLong(responseSerializationCpuTimeNsString);
       } else {
         _realtimeResponseSerializationCpuTimeNs += 
Long.parseLong(responseSerializationCpuTimeNsString);
@@ -200,6 +237,22 @@ public class ExecutionStatsAggregator {
     }
     _numGroupsLimitReached |=
         
Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
+
+
+    String numBlocksString = 
metadata.get(DataTable.MetadataKey.NUM_BLOCKS.getName());
+    if (numBlocksString != null) {
+      _numBlocks += Long.parseLong(numBlocksString);
+    }
+
+    String numRowsString = 
metadata.get(DataTable.MetadataKey.NUM_ROWS.getName());
+    if (numBlocksString != null) {
+      _numRows += Long.parseLong(numRowsString);
+    }
+
+    String operatorExecutionTimeString = 
metadata.get(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName());
+    if (operatorExecutionTimeString != null) {
+      _stageExecutionTimeMs += Long.parseLong(operatorExecutionTimeString);
+    }
   }
 
   public void setStats(BrokerResponseNative brokerResponseNative) {
@@ -280,6 +333,17 @@ public class ExecutionStatsAggregator {
     }
   }
 
+  public void setStageLevelStats(@Nullable String rawTableName, 
BrokerResponseStats brokerResponseStats,
+      @Nullable BrokerMetrics brokerMetrics) {
+    setStats(rawTableName, brokerResponseStats, brokerMetrics);
+
+    brokerResponseStats.setNumBlocks(_numBlocks);
+    brokerResponseStats.setNumRows(_numRows);
+    brokerResponseStats.setStageExecutionTimeMs(_stageExecutionTimeMs);
+    brokerResponseStats.setOperatorIds(_operatorIds);
+    brokerResponseStats.setTableNames(new ArrayList<>(_tableNames));
+  }
+
   private void withNotNullLongMetadata(Map<String, String> metadata, 
DataTable.MetadataKey key, LongConsumer consumer) {
     String strValue = metadata.get(key.getName());
     if (strValue != null) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index d9c0104d91..a6bc863457 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -22,6 +22,7 @@ import com.google.common.base.Joiner;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -74,6 +75,7 @@ public abstract class MultiStageOperator implements 
Operator<TransferableBlock>,
           }
 
           if (!_operatorStats.getExecutionStats().isEmpty()) {
+            
_operatorStats.recordSingleStat(DataTable.MetadataKey.OPERATOR_ID.getName(), 
_operatorId);
             _operatorStatsMap.put(_operatorId, _operatorStats);
           }
           return TransferableBlockUtils.getEndOfStreamTransferableBlock(
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
index 09a8d7ac60..74cefbbc90 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
@@ -22,6 +22,7 @@ import com.google.common.base.Stopwatch;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
 
@@ -38,7 +39,7 @@ public class OperatorStats {
 
   private int _numBlock = 0;
   private int _numRows = 0;
-  private Map<String, String> _executionStats;
+  private final Map<String, String> _executionStats;
 
   public OperatorStats(long requestId, int stageId, VirtualServerAddress 
serverAddress, String operatorType) {
     _stageId = stageId;
@@ -65,14 +66,18 @@ public class OperatorStats {
     _numRows += numRows;
   }
 
+  public void recordSingleStat(String key, String stat) {
+    _executionStats.put(key, stat);
+  }
+
   public void recordExecutionStats(Map<String, String> executionStats) {
-    _executionStats = executionStats;
+    _executionStats.putAll(executionStats);
   }
 
   public Map<String, String> getExecutionStats() {
-    _executionStats.put(OperatorUtils.NUM_BLOCKS, String.valueOf(_numBlock));
-    _executionStats.put(OperatorUtils.NUM_ROWS, String.valueOf(_numRows));
-    _executionStats.put(OperatorUtils.THREAD_EXECUTION_TIME,
+    _executionStats.putIfAbsent(DataTable.MetadataKey.NUM_BLOCKS.getName(), 
String.valueOf(_numBlock));
+    _executionStats.putIfAbsent(DataTable.MetadataKey.NUM_ROWS.getName(), 
String.valueOf(_numRows));
+    
_executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName(),
         String.valueOf(_executeStopwatch.elapsed(TimeUnit.MILLISECONDS)));
     return _executionStats;
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
index 532befd702..ab79339891 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
@@ -20,10 +20,13 @@ package org.apache.pinot.query.runtime.operator.utils;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Joiner;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pinot.common.datablock.MetadataBlock;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.operator.OperatorStats;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -32,10 +35,6 @@ import org.slf4j.LoggerFactory;
 
 
 public class OperatorUtils {
-  public static final String NUM_BLOCKS = "numBlocks";
-  public static final String NUM_ROWS = "numRows";
-  public static final String THREAD_EXECUTION_TIME = "threadExecutionTime";
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OperatorUtils.class);
   private static final Map<String, String> OPERATOR_TOKEN_MAPPING = new 
HashMap<>();
 
@@ -69,6 +68,13 @@ public class OperatorUtils {
     return functionName;
   }
 
+  public static void recordTableName(OperatorStats operatorStats, 
StageMetadata operatorStageMetadata) {
+    if (!operatorStageMetadata.getScannedTables().isEmpty()) {
+      operatorStats.recordSingleStat(DataTable.MetadataKey.TABLE.getName(),
+          Joiner.on("::").join(operatorStageMetadata.getScannedTables()));
+    }
+  }
+
   public static String operatorStatsToJson(OperatorStats operatorStats) {
     try {
       Map<String, Object> jsonOut = new HashMap<>();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index 9c7b73a9e0..18511d9ee7 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -69,7 +69,7 @@ public class QueryDispatcher {
 
   public ResultTable submitAndReduce(long requestId, QueryPlan queryPlan,
       MailboxService<TransferableBlock> mailboxService, long timeoutMs, 
Map<String, String> queryOptions,
-      ExecutionStatsAggregator executionStatsAggregator)
+      Map<Integer, ExecutionStatsAggregator> executionStatsAggregator)
       throws Exception {
     // submit all the distributed stages.
     int reduceStageId = submit(requestId, queryPlan, timeoutMs, queryOptions);
@@ -80,7 +80,7 @@ public class QueryDispatcher {
         reduceNode.getSenderStageId(), reduceStageId, 
reduceNode.getDataSchema(),
         new VirtualServerAddress(mailboxService.getHostname(), 
mailboxService.getMailboxPort(), 0), timeoutMs);
     List<DataBlock> resultDataBlocks =
-        reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, 
executionStatsAggregator);
+        reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, 
executionStatsAggregator, queryPlan);
     return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(),
         queryPlan.getQueryStageMap().get(0).getDataSchema());
   }
@@ -128,11 +128,11 @@ public class QueryDispatcher {
   }
 
   public static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator 
mailboxReceiveOperator, long timeoutMs) {
-    return reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, null);
+    return reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, null, null);
   }
 
   public static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator 
mailboxReceiveOperator, long timeoutMs,
-      @Nullable ExecutionStatsAggregator executionStatsAggregator) {
+      @Nullable Map<Integer, ExecutionStatsAggregator> 
executionStatsAggregatorMap, QueryPlan queryPlan) {
     List<DataBlock> resultDataBlocks = new ArrayList<>();
     TransferableBlock transferableBlock;
     long timeoutWatermark = System.nanoTime() + timeoutMs * 1_000_000L;
@@ -147,11 +147,19 @@ public class QueryDispatcher {
       if (transferableBlock.isNoOpBlock()) {
         continue;
       } else if (transferableBlock.isEndOfStreamBlock()) {
-        if (executionStatsAggregator != null) {
+        if (executionStatsAggregatorMap != null) {
           for (Map.Entry<String, OperatorStats> entry : 
transferableBlock.getResultMetadata().entrySet()) {
             LOGGER.info("Broker Query Execution Stats - OperatorId: {}, 
OperatorStats: {}", entry.getKey(),
                 OperatorUtils.operatorStatsToJson(entry.getValue()));
-            executionStatsAggregator.aggregate(null, 
entry.getValue().getExecutionStats(), new HashMap<>());
+            OperatorStats operatorStats = entry.getValue();
+            ExecutionStatsAggregator rootStatsAggregator = 
executionStatsAggregatorMap.get(0);
+            ExecutionStatsAggregator stageStatsAggregator = 
executionStatsAggregatorMap.get(operatorStats.getStageId());
+            if (queryPlan != null) {
+              StageMetadata operatorStageMetadata = 
queryPlan.getStageMetadataMap().get(operatorStats.getStageId());
+              OperatorUtils.recordTableName(operatorStats, 
operatorStageMetadata);
+            }
+            rootStatsAggregator.aggregate(null, 
entry.getValue().getExecutionStats(), new HashMap<>());
+            stageStatsAggregator.aggregate(null, 
entry.getValue().getExecutionStats(), new HashMap<>());
           }
         }
         return resultDataBlocks;
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 6ebc8cdb1c..53be2be4a5 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -82,7 +82,7 @@ public abstract class QueryRunnerTestBase extends 
QueryTestSet {
   // --------------------------------------------------------------------------
   // QUERY UTILS
   // --------------------------------------------------------------------------
-  protected List<Object[]> queryRunner(String sql, ExecutionStatsAggregator 
executionStatsAggregator) {
+  protected List<Object[]> queryRunner(String sql, Map<Integer, 
ExecutionStatsAggregator> executionStatsAggregatorMap) {
     QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
     Map<String, String> requestMetadataMap =
         ImmutableMap.of(QueryConfig.KEY_OF_BROKER_REQUEST_ID, 
String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()),
@@ -105,11 +105,14 @@ public abstract class QueryRunnerTestBase extends 
QueryTestSet {
           
_servers.get(serverInstance.getServer()).processQuery(distributedStagePlan, 
requestMetadataMap);
         }
       }
+      if (executionStatsAggregatorMap != null) {
+        executionStatsAggregatorMap.put(stageId, new 
ExecutionStatsAggregator(false));
+      }
     }
     Preconditions.checkNotNull(mailboxReceiveOperator);
     return QueryDispatcher.toResultTable(
         QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator, 
CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS,
-            executionStatsAggregator),
+            executionStatsAggregatorMap, null),
         queryPlan.getQueryResultFields(), 
queryPlan.getQueryStageMap().get(0).getDataSchema()).getRows();
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index d311e9f467..5b57f59ab0 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -38,7 +38,8 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.pinot.common.datatable.DataTableFactory;
-import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
+import org.apache.pinot.common.response.broker.BrokerResponseStats;
 import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
 import org.apache.pinot.query.QueryEnvironmentTestBase;
@@ -228,19 +229,40 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
   @Test(dataProvider = "testResourceQueryTestCaseProviderWithMetadata")
   public void testQueryTestCasesWithMetadata(String testCaseName, String sql, 
String expect, int numSegments)
       throws Exception {
-    ExecutionStatsAggregator executionStatsAggregator = new 
ExecutionStatsAggregator(false);
-    runQuery(sql, expect, executionStatsAggregator).ifPresent(rows -> {
-      BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
-      executionStatsAggregator.setStats(brokerResponseNative);
+    Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap = new 
HashMap<>();
+    runQuery(sql, expect, executionStatsAggregatorMap).ifPresent(rows -> {
+      BrokerResponseNativeV2 brokerResponseNative = new 
BrokerResponseNativeV2();
+      executionStatsAggregatorMap.get(0).setStats(brokerResponseNative);
+      Assert.assertFalse(executionStatsAggregatorMap.isEmpty());
+      for (Integer stageId : executionStatsAggregatorMap.keySet()) {
+        if (stageId > 0) {
+          BrokerResponseStats brokerResponseStats = new BrokerResponseStats();
+          executionStatsAggregatorMap.get(stageId).setStageLevelStats(null, 
brokerResponseStats, null);
+          brokerResponseNative.addStageStat(stageId, brokerResponseStats);
+        }
+      }
+
       Assert.assertEquals(brokerResponseNative.getNumSegmentsQueried(), 
numSegments);
+
+      Map<Integer, BrokerResponseStats> stageIdStats = 
brokerResponseNative.getStageIdStats();
+      for (Integer stageId : stageIdStats.keySet()) {
+        // check stats only for leaf stage
+        BrokerResponseStats brokerResponseStats = stageIdStats.get(stageId);
+        if (!brokerResponseStats.getTableNames().isEmpty()) {
+          Assert.assertEquals(brokerResponseStats.getTableNames().size(), 1);
+          String tableName = brokerResponseStats.getTableNames().get(0);
+          Assert.assertNotNull(_tableToSegmentMap.get(tableName));
+          Assert.assertEquals(brokerResponseStats.getNumSegmentsQueried(), 
_tableToSegmentMap.get(tableName).size());
+        }
+      }
     });
   }
 
   private Optional<List<Object[]>> runQuery(String sql, final String except,
-      ExecutionStatsAggregator executionStatsAggregator) {
+      Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap) {
     try {
       // query pinot
-      List<Object[]> resultRows = queryRunner(sql, executionStatsAggregator);
+      List<Object[]> resultRows = queryRunner(sql, 
executionStatsAggregatorMap);
 
       Assert.assertNull(except,
           "Expected error with message '" + except + "'. But instead rows were 
returned: " + resultRows.stream()
@@ -299,29 +321,30 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
       throws Exception {
     Map<String, QueryTestCase> testCaseMap = getTestCases();
     List<Object[]> providerContent = new ArrayList<>();
+    Set<String> validTestCases = new HashSet<>();
+    validTestCases.add("basic_test");
+    validTestCases.add("framework_test");
+
     for (Map.Entry<String, QueryTestCase> testCaseEntry : 
testCaseMap.entrySet()) {
       String testCaseName = testCaseEntry.getKey();
+      if (!validTestCases.contains(testCaseName)) {
+        continue;
+      }
+
       if (testCaseEntry.getValue()._ignored) {
         continue;
       }
 
       List<QueryTestCase.Query> queryCases = testCaseEntry.getValue()._queries;
       for (QueryTestCase.Query queryCase : queryCases) {
+
         if (queryCase._ignored) {
           continue;
         }
 
-        if (queryCase._outputs != null) {
+        if (queryCase._outputs == null) {
           String sql = replaceTableName(testCaseName, queryCase._sql);
-          if (!sql.contains("basic_test")) {
-            continue;
-          }
 
-          List<List<Object>> orgRows = queryCase._outputs;
-          List<Object[]> expectedRows = new ArrayList<>(orgRows.size());
-          for (List<Object> objs : orgRows) {
-            expectedRows.add(objs.toArray());
-          }
           int segmentCount = 0;
           for (String tableName : testCaseEntry.getValue()._tables.keySet()) {
             segmentCount +=
@@ -401,6 +424,7 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
         {
           HashSet<String> hashSet = new HashSet<>(testCaseMap.keySet());
           hashSet.retainAll(testCases.keySet());
+
           if (!hashSet.isEmpty()) {
             throw new IllegalArgumentException("testCase already exist for the 
following names: " + hashSet);
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to