This is an automated email from the ASF dual-hosted git repository.

sunithabeeram pushed a commit to branch consumingStats
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 788e7cc151e407d5e1c13f1da748b270e96d6aac
Author: Sunitha Beeram <sbee...@sbeeram-ld2.linkedin.biz>
AuthorDate: Tue May 21 10:55:51 2019 -0700

    Emit freshnessLag metric from broker for queries hitting consuming segments
---
 .../requesthandler/BaseBrokerRequestHandler.java     |  6 ++++--
 .../org/apache/pinot/common/metrics/BrokerMeter.java |  3 +++
 .../apache/pinot/common/response/BrokerResponse.java | 10 ++++++++++
 .../common/response/broker/BrokerResponseNative.java |  1 -
 .../query/executor/ServerQueryExecutorV1Impl.java    |  4 +++-
 .../pinot/core/query/reduce/BrokerReduceService.java | 20 ++++++++++++++++----
 .../pinot/core/query/scheduler/QueryScheduler.java   | 12 +++++++++---
 7 files changed, 45 insertions(+), 11 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 7bc689f..5f503dd 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -308,12 +308,14 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     if (_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, 
totalTimeMs)) {
       // Table name might have been changed (with suffix _OFFLINE/_REALTIME 
appended)
       LOGGER.info(
-          "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, 
segments(queried/processed/matched):{}/{}/{} "
-              + "servers:{}/{}, groupLimitReached:{}, exceptions:{}, 
serverStats:{}, query:{}", requestId,
+          "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, 
segments(queried/processed/matched/consuming):{}/{}/{}/{},"
+              + " consumingFreshnessTimeMs:{},"
+              + " servers:{}/{}, groupLimitReached:{}, exceptions:{}, 
serverStats:{}, query:{}", requestId,
           brokerRequest.getQuerySource().getTableName(), totalTimeMs, 
brokerResponse.getNumDocsScanned(),
           brokerResponse.getTotalDocs(), 
brokerResponse.getNumEntriesScannedInFilter(),
           brokerResponse.getNumEntriesScannedPostFilter(), 
brokerResponse.getNumSegmentsQueried(),
           brokerResponse.getNumSegmentsProcessed(), 
brokerResponse.getNumSegmentsMatched(),
+          brokerResponse.getNumConsumingSegmentsQueried(), 
brokerResponse.getMinConsumingFreshnessTimeMs(),
           brokerResponse.getNumServersResponded(), 
brokerResponse.getNumServersQueried(),
           brokerResponse.isNumGroupsLimitReached(), 
brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
           StringUtils.substring(query, 0, _queryLogLength));
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 170e745..facd15f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -63,6 +63,9 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
   ENTRIES_SCANNED_IN_FILTER("documents", false),
   ENTRIES_SCANNED_POST_FILTER("documents", false),
 
+  // metric tracking the freshness lag for consuming segments
+  FRESHNESS_LAG_MS("latency", false),
+
   REQUEST_CONNECTION_TIMEOUTS("timeouts", false),
   HELIX_ZOOKEEPER_RECONNECTS("reconnects", true),
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index 72b95e2..ec2960e 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -98,6 +98,16 @@ public interface BrokerResponse {
   long getNumSegmentsMatched();
 
   /**
+   * Get number of consuming segments that were queried.
+   */
+  long getNumConsumingSegmentsQueried();
+
+  /**
+   * Get the minimum freshness timestamp across consuming segments that were 
queried
+   */
+  long getMinConsumingFreshnessTimeMs();
+
+  /**
    * Get total number of documents within the table hit.
    */
   long getTotalDocs();
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 214d942..1ccedd6 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -227,7 +227,6 @@ public class BrokerResponseNative implements BrokerResponse 
{
     _minConsumingFreshnessTimeMs = minConsumingFreshnessTimeMs;
   }
 
-
   @JsonProperty("totalDocs")
   @Override
   public long getTotalDocs() {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 3058539..5f70dc2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -170,11 +170,13 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
       }
     }
 
-    long minConsumingFreshnessTimeMs = Long.MAX_VALUE;
+    long minConsumingFreshnessTimeMs = -1;
     if (numConsumingSegmentsQueried > 0) {
       if (minIngestionTimeMs == Long.MAX_VALUE) {
         LOGGER.debug("Did not find valid ingestionTimestamp across consuming 
segments! Using indexTime instead");
         minConsumingFreshnessTimeMs = minIndexTimeMs;
+      } else {
+        minConsumingFreshnessTimeMs = minIngestionTimeMs;
       }
       LOGGER.debug("Querying {} consuming segments with min 
minConsumingFreshnessTimeMs {}", numConsumingSegmentsQueried, 
minConsumingFreshnessTimeMs);
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index d539bf7..dde809e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -64,6 +64,7 @@ import org.slf4j.LoggerFactory;
 @ThreadSafe
 public class BrokerReduceService implements 
ReduceService<BrokerResponseNative> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BrokerReduceService.class);
+  private static final long INVALID_FRESHNESS = -1;
 
   @Nonnull
   @Override
@@ -143,9 +144,13 @@ public class BrokerReduceService implements 
ReduceService<BrokerResponseNative>
         numConsumingSegmentsQueried += Long.parseLong(numConsumingString);
       }
 
-      String minConsumingIndexTsString = 
metadata.get(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS);
-      if (minConsumingIndexTsString != null) {
-        minConsumingFreshnessTimeMs = 
Math.min(Long.parseLong(minConsumingIndexTsString), 
minConsumingFreshnessTimeMs);
+      String minConsumingFreshnessTimeMsString = 
metadata.get(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS);
+      if (minConsumingFreshnessTimeMsString != null) {
+        long freshness = Long.parseLong(minConsumingFreshnessTimeMsString);
+        // ignore invalid values
+        if (freshness > INVALID_FRESHNESS) {
+          minConsumingFreshnessTimeMs = Math.min(freshness, 
minConsumingFreshnessTimeMs);
+        }
       }
 
       String numTotalRawDocsString = 
metadata.get(DataTable.TOTAL_DOCS_METADATA_KEY);
@@ -182,7 +187,9 @@ public class BrokerReduceService implements 
ReduceService<BrokerResponseNative>
     brokerResponseNative.setNumGroupsLimitReached(numGroupsLimitReached);
     if (numConsumingSegmentsQueried > 0) {
       if (minConsumingFreshnessTimeMs == Long.MAX_VALUE) {
-        LOGGER.error("Invalid lastIndexedTimestamp across {} consuming 
segments", numConsumingSegmentsQueried);
+        LOGGER.error("Invalid freshness time across {} consuming segments", 
numConsumingSegmentsQueried);
+        // use the invalid value (-1) for clear logging
+        minConsumingFreshnessTimeMs = INVALID_FRESHNESS;
       }
       
brokerResponseNative.setNumConsumingSegmentsQueried(numConsumingSegmentsQueried);
       
brokerResponseNative.setMinConsumingFreshnessTimeMs(minConsumingFreshnessTimeMs);
@@ -197,6 +204,11 @@ public class BrokerReduceService implements 
ReduceService<BrokerResponseNative>
           .addMeteredTableValue(rawTableName, 
BrokerMeter.ENTRIES_SCANNED_IN_FILTER, numEntriesScannedInFilter);
       brokerMetrics
           .addMeteredTableValue(rawTableName, 
BrokerMeter.ENTRIES_SCANNED_POST_FILTER, numEntriesScannedPostFilter);
+
+      if (numConsumingSegmentsQueried > 0 && minConsumingFreshnessTimeMs > 0) {
+        brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.FRESHNESS_LAG_MS,
+            System.currentTimeMillis() - minConsumingFreshnessTimeMs);
+      }
     }
 
     // Parse the option from request whether to preserve the type
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 627cda7..dcd5ee6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -54,6 +54,7 @@ public abstract class QueryScheduler {
 
   private static final String INVALID_NUM_SCANNED = "-1";
   private static final String INVALID_SEGMENTS_COUNT = "-1";
+  private static final String INVALID_FRESHNESS_MS = "-1";
   private static final String QUERY_LOG_MAX_RATE_KEY = 
"query.log.maxRatePerSecond";
   private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
 
@@ -173,6 +174,10 @@ public abstract class QueryScheduler {
         
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_PROCESSED, 
INVALID_SEGMENTS_COUNT));
     long numSegmentsMatched =
         
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_MATCHED, 
INVALID_SEGMENTS_COUNT));
+    long numSegmentsConsuming =
+        
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_CONSUMING_SEGMENTS_QUERIED,
 INVALID_SEGMENTS_COUNT));
+    long minConsumingFreshnessMs =
+        
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS,
 INVALID_FRESHNESS_MS));
 
     if (numDocsScanned > 0) {
       serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_DOCS_SCANNED, numDocsScanned);
@@ -192,10 +197,11 @@ public abstract class QueryScheduler {
 
     if (queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, 
numDocsScanned)) {
       LOGGER.info(
-          "Processed 
requestId={},table={},segments(queried/processed/matched)={}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}",
-          requestId, tableNameWithType, numSegmentsQueried, 
numSegmentsProcessed, numSegmentsMatched, schedulerWaitMs,
+          "Processed 
requestId={},table={},segments(queried/processed/matched/consuming)={}/{}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},minConsumingFreshnessMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}",
+          requestId, tableNameWithType, numSegmentsQueried, 
numSegmentsProcessed, numSegmentsMatched, numSegmentsConsuming, schedulerWaitMs,
           timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
-          timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), 
queryRequest.getBrokerId(),
+          timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), 
minConsumingFreshnessMs,
+          queryRequest.getBrokerId(),
           numDocsScanned, numEntriesScannedInFilter, 
numEntriesScannedPostFilter, name());
 
       // Limit the dropping log message at most once per second.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to