This is an automated email from the ASF dual-hosted git repository. sunithabeeram pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 06baad6 Emit freshnessLag metric from broker for queries hitting consuming segments (#4229) 06baad6 is described below commit 06baad647bfb04ac7c8c1796a7318edea47815d2 Author: Sunitha Beeram <sbee...@linkedin.com> AuthorDate: Tue May 21 22:21:10 2019 -0700 Emit freshnessLag metric from broker for queries hitting consuming segments (#4229) * Emit freshnessLag metric from broker for queries hitting consuming segments * Address review comments * Minor formatting fixes --- .../broker/requesthandler/BaseBrokerRequestHandler.java | 6 ++++-- .../org/apache/pinot/common/metrics/BrokerTimer.java | 4 +++- .../apache/pinot/common/response/BrokerResponse.java | 10 ++++++++++ .../common/response/broker/BrokerResponseNative.java | 6 +++++- .../core/query/executor/ServerQueryExecutorV1Impl.java | 2 +- .../pinot/core/query/reduce/BrokerReduceService.java | 16 ++++++++++------ .../pinot/core/query/scheduler/QueryScheduler.java | 17 ++++++++++++----- 7 files changed, 45 insertions(+), 16 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 7bc689f..89d3470 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -308,12 +308,14 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { if (_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) { // Table name might have been changed (with suffix _OFFLINE/_REALTIME appended) LOGGER.info( - "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched):{}/{}/{} " - + "servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId, + "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}," + + " segments(queried/processed/matched/consuming):{}/{}/{}/{}, consumingFreshnessTimeMs:{}," + + " servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId, brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(), brokerResponse.getTotalDocs(), brokerResponse.getNumEntriesScannedInFilter(), brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getNumSegmentsQueried(), brokerResponse.getNumSegmentsProcessed(), brokerResponse.getNumSegmentsMatched(), + brokerResponse.getNumConsumingSegmentsQueried(), brokerResponse.getMinConsumingFreshnessTimeMs(), brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(), brokerResponse.isNumGroupsLimitReached(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(), StringUtils.substring(query, 0, _queryLogLength)); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java index 694db60..6c98dab 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java @@ -27,7 +27,9 @@ import org.apache.pinot.common.Utils; */ public enum BrokerTimer implements AbstractMetrics.Timer { ROUTING_TABLE_UPDATE_TIME(true), - CLUSTER_CHANGE_QUEUE_TIME(true); + CLUSTER_CHANGE_QUEUE_TIME(true), + // metric tracking the freshness lag for consuming segments + FRESHNESS_LAG_MS(false); private final String timerName; private final boolean global; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java index 72b95e2..ec2960e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java @@ -98,6 +98,16 @@ public interface BrokerResponse { long getNumSegmentsMatched(); /** + * Get number of consuming segments that were queried. + */ + long getNumConsumingSegmentsQueried(); + + /** + * Get the minimum freshness timestamp across consuming segments that were queried + */ + long getMinConsumingFreshnessTimeMs(); + + /** * Get total number of documents within the table hit. */ long getTotalDocs(); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java index 214d942..38ae0c3 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java @@ -178,6 +178,7 @@ public class BrokerResponseNative implements BrokerResponse { } @JsonProperty("numSegmentsQueried") + @Override public long getNumSegmentsQueried() { return _numSegmentsQueried; } @@ -188,6 +189,7 @@ public class BrokerResponseNative implements BrokerResponse { } @JsonProperty("numSegmentsProcessed") + @Override public long getNumSegmentsProcessed() { return _numSegmentsProcessed; } @@ -198,6 +200,7 @@ public class BrokerResponseNative implements BrokerResponse { } @JsonProperty("numSegmentsMatched") + @Override public long getNumSegmentsMatched() { return _numSegmentsMatched; } @@ -208,6 +211,7 @@ public class BrokerResponseNative implements BrokerResponse { } @JsonProperty("numConsumingSegmentsQueried") + @Override public long getNumConsumingSegmentsQueried() { return _numConsumingSegmentsQueried; } @@ -218,6 +222,7 @@ public class BrokerResponseNative implements BrokerResponse { } @JsonProperty("minConsumingFreshnessTimeMs") + @Override public long getMinConsumingFreshnessTimeMs() { return _minConsumingFreshnessTimeMs; } @@ -227,7 +232,6 @@ public class BrokerResponseNative implements BrokerResponse { _minConsumingFreshnessTimeMs = minConsumingFreshnessTimeMs; } - @JsonProperty("totalDocs") @Override public long getTotalDocs() { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index 3058539..6b59e1a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -170,7 +170,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { } } - long minConsumingFreshnessTimeMs = Long.MAX_VALUE; + long minConsumingFreshnessTimeMs = minIngestionTimeMs; if (numConsumingSegmentsQueried > 0) { if (minIngestionTimeMs == Long.MAX_VALUE) { LOGGER.debug("Did not find valid ingestionTimestamp across consuming segments! Using indexTime instead"); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java index d539bf7..ca9d91f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -33,6 +34,7 @@ import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.metrics.BrokerTimer; import org.apache.pinot.common.query.ReduceService; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.GroupBy; @@ -143,9 +145,9 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative> numConsumingSegmentsQueried += Long.parseLong(numConsumingString); } - String minConsumingIndexTsString = metadata.get(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS); - if (minConsumingIndexTsString != null) { - minConsumingFreshnessTimeMs = Math.min(Long.parseLong(minConsumingIndexTsString), minConsumingFreshnessTimeMs); + String minConsumingFreshnessTimeMsString = metadata.get(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS); + if (minConsumingFreshnessTimeMsString != null) { + minConsumingFreshnessTimeMs = Math.min(Long.parseLong(minConsumingFreshnessTimeMsString), minConsumingFreshnessTimeMs); } String numTotalRawDocsString = metadata.get(DataTable.TOTAL_DOCS_METADATA_KEY); @@ -181,9 +183,6 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative> brokerResponseNative.setTotalDocs(numTotalRawDocs); brokerResponseNative.setNumGroupsLimitReached(numGroupsLimitReached); if (numConsumingSegmentsQueried > 0) { - if (minConsumingFreshnessTimeMs == Long.MAX_VALUE) { - LOGGER.error("Invalid lastIndexedTimestamp across {} consuming segments", numConsumingSegmentsQueried); - } brokerResponseNative.setNumConsumingSegmentsQueried(numConsumingSegmentsQueried); brokerResponseNative.setMinConsumingFreshnessTimeMs(minConsumingFreshnessTimeMs); } @@ -197,6 +196,11 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative> .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_IN_FILTER, numEntriesScannedInFilter); brokerMetrics .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_POST_FILTER, numEntriesScannedPostFilter); + + if (numConsumingSegmentsQueried > 0 && minConsumingFreshnessTimeMs > 0) { + brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.FRESHNESS_LAG_MS, + System.currentTimeMillis() - minConsumingFreshnessTimeMs, TimeUnit.MILLISECONDS); + } } // Parse the option from request whether to preserve the type diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java index 627cda7..51e28ed 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java @@ -54,6 +54,7 @@ public abstract class QueryScheduler { private static final String INVALID_NUM_SCANNED = "-1"; private static final String INVALID_SEGMENTS_COUNT = "-1"; + private static final String INVALID_FRESHNESS_MS = "-1"; private static final String QUERY_LOG_MAX_RATE_KEY = "query.log.maxRatePerSecond"; private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d; @@ -173,6 +174,10 @@ public abstract class QueryScheduler { Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_PROCESSED, INVALID_SEGMENTS_COUNT)); long numSegmentsMatched = Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_MATCHED, INVALID_SEGMENTS_COUNT)); + long numSegmentsConsuming = + Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_CONSUMING_SEGMENTS_QUERIED, INVALID_SEGMENTS_COUNT)); + long minConsumingFreshnessMs = + Long.parseLong(dataTableMetadata.getOrDefault(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS, INVALID_FRESHNESS_MS)); if (numDocsScanned > 0) { serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_DOCS_SCANNED, numDocsScanned); @@ -192,11 +197,13 @@ public abstract class QueryScheduler { if (queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned)) { LOGGER.info( - "Processed requestId={},table={},segments(queried/processed/matched)={}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}", - requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, schedulerWaitMs, - timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING), - timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), queryRequest.getBrokerId(), - numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name()); + "Processed requestId={},table={},segments(queried/processed/matched/consuming)={}/{}/{}/{}," + + "schedulerWaitMs={},totalExecMs={},totalTimeMs={},minConsumingFreshnessMs={},broker={}," + + "numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}", + requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, + numSegmentsConsuming, schedulerWaitMs, timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING), + timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), minConsumingFreshnessMs, + queryRequest.getBrokerId(), numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name()); // Limit the dropping log message at most once per second. if (numDroppedLogRateLimiter.tryAcquire()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org