This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch add-two-broker-metrics in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 544396e6f9d517f6b32f9cc8a7d918056157ce41 Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Tue May 18 12:30:51 2021 -0700 Add SERVER_PROCESSING and NETTY_CONNECTION to broker query phase metrics --- .../SingleConnectionBrokerRequestHandler.java | 13 +++++++++++-- .../org/apache/pinot/common/metrics/BrokerQueryPhase.java | 2 ++ .../org/apache/pinot/core/transport/ServerResponse.java | 7 +++++++ 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java index 0abf690..e74860c 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java @@ -87,11 +87,12 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl .submitQuery(requestId, rawTableName, offlineBrokerRequest, offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, timeoutMs); Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getResponse(); - _brokerMetrics - .addPhaseTiming(rawTableName, BrokerQueryPhase.SCATTER_GATHER, System.nanoTime() - scatterGatherStartTimeNs); + long scatterGatherDuration = System.nanoTime() - scatterGatherStartTimeNs; + _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.SCATTER_GATHER, scatterGatherDuration); // TODO Use scatterGatherStats as serverStats serverStats.setServerStats(asyncQueryResponse.getStats()); + long maxServerProcessingTimeMs = 0; int numServersQueried = response.size(); long totalResponseSize = 0; Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>(HashUtil.getHashMapCapacity(numServersQueried)); @@ -102,8 +103,16 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl dataTableMap.put(entry.getKey(), dataTable); totalResponseSize += serverResponse.getResponseSize(); } + long serverProcessingTimeMs = serverResponse.getServerProcessingTimeMs(); + if (serverProcessingTimeMs > -1) { + maxServerProcessingTimeMs = Math.max(maxServerProcessingTimeMs, serverProcessingTimeMs); + } } int numServersResponded = dataTableMap.size(); + long serverMaxProcessingTimeNs = TimeUnit.MILLISECONDS.toNanos(maxServerProcessingTimeMs); + _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.SERVER_PROCESSING, serverMaxProcessingTimeNs); + _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.NETTY_CONNECTION, + scatterGatherDuration - serverMaxProcessingTimeNs); long reduceStartTimeNs = System.nanoTime(); long reduceTimeOutMs = timeoutMs - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - scatterGatherStartTimeNs); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerQueryPhase.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerQueryPhase.java index cc8cd58..5a0e449 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerQueryPhase.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerQueryPhase.java @@ -30,6 +30,8 @@ public enum BrokerQueryPhase implements AbstractMetrics.QueryPhase { QUERY_EXECUTION, QUERY_ROUTING, SCATTER_GATHER, + SERVER_PROCESSING, + NETTY_CONNECTION, DESERIALIZATION, REDUCE, REQUEST_CONNECTION_WAIT, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerResponse.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerResponse.java index 0583884..6db0d0b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerResponse.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerResponse.java @@ -30,6 +30,7 @@ public class ServerResponse { private final long _startTimeMs; private volatile long _submitRequestTimeMs; private volatile long _receiveDataTableTimeMs; + private volatile long _serverProcessingTimeMs; private volatile DataTable _dataTable; private volatile int _responseSize; private volatile int _deserializationTimeMs; @@ -58,6 +59,10 @@ public class ServerResponse { } } + public long getServerProcessingTimeMs() { + return _serverProcessingTimeMs; + } + public int getResponseSize() { return _responseSize; } @@ -79,6 +84,8 @@ public class ServerResponse { void receiveDataTable(DataTable dataTable, int responseSize, int deserializationTimeMs) { _receiveDataTableTimeMs = System.currentTimeMillis(); _dataTable = dataTable; + _serverProcessingTimeMs = + Long.parseLong(_dataTable.getMetadata().getOrDefault(DataTable.MetadataKey.TIME_USED_MS.getName(), "-1")); _responseSize = responseSize; _deserializationTimeMs = deserializationTimeMs; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org