This is an automated email from the ASF dual-hosted git repository. sunithabeeram pushed a commit to branch consumingStats in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 788e7cc151e407d5e1c13f1da748b270e96d6aac Author: Sunitha Beeram <sbee...@sbeeram-ld2.linkedin.biz> AuthorDate: Tue May 21 10:55:51 2019 -0700 Emit freshnessLag metric from broker for queries hitting consuming segments --- .../requesthandler/BaseBrokerRequestHandler.java | 6 ++++-- .../org/apache/pinot/common/metrics/BrokerMeter.java | 3 +++ .../apache/pinot/common/response/BrokerResponse.java | 10 ++++++++++ .../common/response/broker/BrokerResponseNative.java | 1 - .../query/executor/ServerQueryExecutorV1Impl.java | 4 +++- .../pinot/core/query/reduce/BrokerReduceService.java | 20 ++++++++++++++++---- .../pinot/core/query/scheduler/QueryScheduler.java | 12 +++++++++--- 7 files changed, 45 insertions(+), 11 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 7bc689f..5f503dd 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -308,12 +308,14 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { if (_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) { // Table name might have been changed (with suffix _OFFLINE/_REALTIME appended) LOGGER.info( - "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched):{}/{}/{} " - + "servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId, + "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched/consuming):{}/{}/{}/{}," + + " consumingFreshnessTimeMs:{}," + + " servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId, brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(), brokerResponse.getTotalDocs(), brokerResponse.getNumEntriesScannedInFilter(), brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getNumSegmentsQueried(), brokerResponse.getNumSegmentsProcessed(), brokerResponse.getNumSegmentsMatched(), + brokerResponse.getNumConsumingSegmentsQueried(), brokerResponse.getMinConsumingFreshnessTimeMs(), brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(), brokerResponse.isNumGroupsLimitReached(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(), StringUtils.substring(query, 0, _queryLogLength)); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java index 170e745..facd15f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java @@ -63,6 +63,9 @@ public enum BrokerMeter implements AbstractMetrics.Meter { ENTRIES_SCANNED_IN_FILTER("documents", false), ENTRIES_SCANNED_POST_FILTER("documents", false), + // metric tracking the freshness lag for consuming segments + FRESHNESS_LAG_MS("latency", false), + REQUEST_CONNECTION_TIMEOUTS("timeouts", false), HELIX_ZOOKEEPER_RECONNECTS("reconnects", true), diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java index 72b95e2..ec2960e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java @@ -98,6 +98,16 @@ public interface BrokerResponse { long getNumSegmentsMatched(); /** + * Get number of consuming segments that were queried. + */ + long getNumConsumingSegmentsQueried(); + + /** + * Get the minimum freshness timestamp across consuming segments that were queried + */ + long getMinConsumingFreshnessTimeMs(); + + /** * Get total number of documents within the table hit. */ long getTotalDocs(); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java index 214d942..1ccedd6 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java @@ -227,7 +227,6 @@ public class BrokerResponseNative implements BrokerResponse { _minConsumingFreshnessTimeMs = minConsumingFreshnessTimeMs; } - @JsonProperty("totalDocs") @Override public long getTotalDocs() { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index 3058539..5f70dc2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -170,11 +170,13 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { } } - long minConsumingFreshnessTimeMs = Long.MAX_VALUE; + long minConsumingFreshnessTimeMs = -1; if (numConsumingSegmentsQueried > 0) { if (minIngestionTimeMs == Long.MAX_VALUE) { LOGGER.debug("Did not find valid ingestionTimestamp across consuming segments! Using indexTime instead"); minConsumingFreshnessTimeMs = minIndexTimeMs; + } else { + minConsumingFreshnessTimeMs = minIngestionTimeMs; } LOGGER.debug("Querying {} consuming segments with min minConsumingFreshnessTimeMs {}", numConsumingSegmentsQueried, minConsumingFreshnessTimeMs); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java index d539bf7..dde809e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java @@ -64,6 +64,7 @@ import org.slf4j.LoggerFactory; @ThreadSafe public class BrokerReduceService implements ReduceService<BrokerResponseNative> { private static final Logger LOGGER = LoggerFactory.getLogger(BrokerReduceService.class); + private static final long INVALID_FRESHNESS = -1; @Nonnull @Override @@ -143,9 +144,13 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative> numConsumingSegmentsQueried += Long.parseLong(numConsumingString); } - String minConsumingIndexTsString = metadata.get(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS); - if (minConsumingIndexTsString != null) { - minConsumingFreshnessTimeMs = Math.min(Long.parseLong(minConsumingIndexTsString), minConsumingFreshnessTimeMs); + String minConsumingFreshnessTimeMsString = metadata.get(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS); + if (minConsumingFreshnessTimeMsString != null) { + long freshness = Long.parseLong(minConsumingFreshnessTimeMsString); + // ignore invalid values + if (freshness > INVALID_FRESHNESS) { + minConsumingFreshnessTimeMs = Math.min(freshness, minConsumingFreshnessTimeMs); + } } String numTotalRawDocsString = metadata.get(DataTable.TOTAL_DOCS_METADATA_KEY); @@ -182,7 +187,9 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative> brokerResponseNative.setNumGroupsLimitReached(numGroupsLimitReached); if (numConsumingSegmentsQueried > 0) { if (minConsumingFreshnessTimeMs == Long.MAX_VALUE) { - LOGGER.error("Invalid lastIndexedTimestamp across {} consuming segments", numConsumingSegmentsQueried); + LOGGER.error("Invalid freshness time across {} consuming segments", numConsumingSegmentsQueried); + // use the invalid value (-1) for clear logging + minConsumingFreshnessTimeMs = INVALID_FRESHNESS; } brokerResponseNative.setNumConsumingSegmentsQueried(numConsumingSegmentsQueried); brokerResponseNative.setMinConsumingFreshnessTimeMs(minConsumingFreshnessTimeMs); @@ -197,6 +204,11 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative> .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_IN_FILTER, numEntriesScannedInFilter); brokerMetrics .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_POST_FILTER, numEntriesScannedPostFilter); + + if (numConsumingSegmentsQueried > 0 && minConsumingFreshnessTimeMs > 0) { + brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.FRESHNESS_LAG_MS, + System.currentTimeMillis() - minConsumingFreshnessTimeMs); + } } // Parse the option from request whether to preserve the type diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java index 627cda7..dcd5ee6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java @@ -54,6 +54,7 @@ public abstract class QueryScheduler { private static final String INVALID_NUM_SCANNED = "-1"; private static final String INVALID_SEGMENTS_COUNT = "-1"; + private static final String INVALID_FRESHNESS_MS = "-1"; private static final String QUERY_LOG_MAX_RATE_KEY = "query.log.maxRatePerSecond"; private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d; @@ -173,6 +174,10 @@ public abstract class QueryScheduler { Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_PROCESSED, INVALID_SEGMENTS_COUNT)); long numSegmentsMatched = Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_MATCHED, INVALID_SEGMENTS_COUNT)); + long numSegmentsConsuming = + Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_CONSUMING_SEGMENTS_QUERIED, INVALID_SEGMENTS_COUNT)); + long minConsumingFreshnessMs = + Long.parseLong(dataTableMetadata.getOrDefault(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS, INVALID_FRESHNESS_MS)); if (numDocsScanned > 0) { serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_DOCS_SCANNED, numDocsScanned); @@ -192,10 +197,11 @@ public abstract class QueryScheduler { if (queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned)) { LOGGER.info( - "Processed requestId={},table={},segments(queried/processed/matched)={}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}", - requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, schedulerWaitMs, + "Processed requestId={},table={},segments(queried/processed/matched/consuming)={}/{}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},minConsumingFreshnessMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}", + requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, numSegmentsConsuming, schedulerWaitMs, timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING), - timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), queryRequest.getBrokerId(), + timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), minConsumingFreshnessMs, + queryRequest.getBrokerId(), numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name()); // Limit the dropping log message at most once per second. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org