This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-two-broker-metrics
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 544396e6f9d517f6b32f9cc8a7d918056157ce41
Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
AuthorDate: Tue May 18 12:30:51 2021 -0700

    Add SERVER_PROCESSING and NETTY_CONNECTION to broker query phase metrics
---
 .../SingleConnectionBrokerRequestHandler.java               | 13 +++++++++++--
 .../org/apache/pinot/common/metrics/BrokerQueryPhase.java   |  2 ++
 .../org/apache/pinot/core/transport/ServerResponse.java     |  7 +++++++
 3 files changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 0abf690..e74860c 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -87,11 +87,12 @@ public class SingleConnectionBrokerRequestHandler extends 
BaseBrokerRequestHandl
         .submitQuery(requestId, rawTableName, offlineBrokerRequest, 
offlineRoutingTable, realtimeBrokerRequest,
             realtimeRoutingTable, timeoutMs);
     Map<ServerRoutingInstance, ServerResponse> response = 
asyncQueryResponse.getResponse();
-    _brokerMetrics
-        .addPhaseTiming(rawTableName, BrokerQueryPhase.SCATTER_GATHER, 
System.nanoTime() - scatterGatherStartTimeNs);
+    long scatterGatherDuration = System.nanoTime() - scatterGatherStartTimeNs;
+    _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.SCATTER_GATHER, scatterGatherDuration);
     // TODO Use scatterGatherStats as serverStats
     serverStats.setServerStats(asyncQueryResponse.getStats());
 
+    long maxServerProcessingTimeMs = 0;
     int numServersQueried = response.size();
     long totalResponseSize = 0;
     Map<ServerRoutingInstance, DataTable> dataTableMap = new 
HashMap<>(HashUtil.getHashMapCapacity(numServersQueried));
@@ -102,8 +103,16 @@ public class SingleConnectionBrokerRequestHandler extends 
BaseBrokerRequestHandl
         dataTableMap.put(entry.getKey(), dataTable);
         totalResponseSize += serverResponse.getResponseSize();
       }
+      long serverProcessingTimeMs = serverResponse.getServerProcessingTimeMs();
+      if (serverProcessingTimeMs > -1) {
+        maxServerProcessingTimeMs = Math.max(maxServerProcessingTimeMs, 
serverProcessingTimeMs);
+      }
     }
     int numServersResponded = dataTableMap.size();
+    long serverMaxProcessingTimeNs = 
TimeUnit.MILLISECONDS.toNanos(maxServerProcessingTimeMs);
+    _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.SERVER_PROCESSING, serverMaxProcessingTimeNs);
+    _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.NETTY_CONNECTION,
+        scatterGatherDuration - serverMaxProcessingTimeNs);
 
     long reduceStartTimeNs = System.nanoTime();
     long reduceTimeOutMs = timeoutMs - 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - scatterGatherStartTimeNs);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerQueryPhase.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerQueryPhase.java
index cc8cd58..5a0e449 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerQueryPhase.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerQueryPhase.java
@@ -30,6 +30,8 @@ public enum BrokerQueryPhase implements 
AbstractMetrics.QueryPhase {
   QUERY_EXECUTION,
   QUERY_ROUTING,
   SCATTER_GATHER,
+  SERVER_PROCESSING,
+  NETTY_CONNECTION,
   DESERIALIZATION,
   REDUCE,
   REQUEST_CONNECTION_WAIT,
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerResponse.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerResponse.java
index 0583884..6db0d0b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerResponse.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerResponse.java
@@ -30,6 +30,7 @@ public class ServerResponse {
   private final long _startTimeMs;
   private volatile long _submitRequestTimeMs;
   private volatile long _receiveDataTableTimeMs;
+  private volatile long _serverProcessingTimeMs;
   private volatile DataTable _dataTable;
   private volatile int _responseSize;
   private volatile int _deserializationTimeMs;
@@ -58,6 +59,10 @@ public class ServerResponse {
     }
   }
 
+  public long getServerProcessingTimeMs() {
+    return _serverProcessingTimeMs;
+  }
+
   public int getResponseSize() {
     return _responseSize;
   }
@@ -79,6 +84,8 @@ public class ServerResponse {
   void receiveDataTable(DataTable dataTable, int responseSize, int 
deserializationTimeMs) {
     _receiveDataTableTimeMs = System.currentTimeMillis();
     _dataTable = dataTable;
+    _serverProcessingTimeMs =
+        
Long.parseLong(_dataTable.getMetadata().getOrDefault(DataTable.MetadataKey.TIME_USED_MS.getName(),
 "-1"));
     _responseSize = responseSize;
     _deserializationTimeMs = deserializationTimeMs;
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to