This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9607ca4 Adding execution stats for numSegmentsQueried/Processed/Matched (#3525) 9607ca4 is described below commit 9607ca490275fe9fc3808dba14a59514d06fa11f Author: Kishore Gopalakrishna <g.kish...@gmail.com> AuthorDate: Mon Nov 26 13:51:40 2018 -0800 Adding execution stats for numSegmentsQueried/Processed/Matched (#3525) * Adding execution stats for numSegmentsQueried/Processed/Matched * Handling review comments. Inverting segmentsNotMatched to segmentsMatched * Changing the order in response json * Changing the order in response json * Emitting metrics numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched * Handling review comments, added test case for offline and realtime * Fixing variable name --- .../requesthandler/BaseBrokerRequestHandler.java | 3 +- .../linkedin/pinot/common/metrics/ServerMeter.java | 4 ++- .../pinot/common/response/BrokerResponse.java | 15 ++++++++ .../response/broker/BrokerResponseNative.java | 42 ++++++++++++++++++++-- .../com/linkedin/pinot/common/utils/DataTable.java | 3 ++ .../core/operator/CombineGroupByOperator.java | 2 ++ .../pinot/core/operator/CombineOperator.java | 2 ++ .../pinot/core/operator/ExecutionStatistics.java | 20 +++++++++-- .../operator/blocks/IntermediateResultsBlock.java | 21 +++++++++++ .../query/executor/ServerQueryExecutorV1Impl.java | 11 +++--- .../core/query/reduce/BrokerReduceService.java | 20 +++++++++++ .../core/query/request/ServerQueryRequest.java | 8 ----- .../pinot/core/query/scheduler/QueryScheduler.java | 23 ++++++++---- .../tests/BaseClusterIntegrationTestSet.java | 23 +++++++++++- .../tests/HybridClusterIntegrationTest.java | 6 ++++ .../tests/OfflineClusterIntegrationTest.java | 6 ++++ 16 files changed, 181 insertions(+), 28 deletions(-) diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index a90dc1a..014842c 100644 --- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -274,9 +274,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { // Table name might have been changed (with suffix _OFFLINE/_REALTIME appended) LOGGER.info( - "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", + "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched):{}/{}/{} servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId, brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(), brokerResponse.getTotalDocs(), brokerResponse.getNumEntriesScannedInFilter(), + brokerResponse.getNumSegmentsQueried(), brokerResponse.getNumSegmentsProcessed(), brokerResponse.getNumSegmentsMatched(), brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(), brokerResponse.isNumGroupsLimitReached(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(), diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/ServerMeter.java index 1551961..4cda98d 100644 --- a/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/ServerMeter.java @@ -51,7 +51,9 @@ public enum ServerMeter implements AbstractMetrics.Meter { NUM_DOCS_SCANNED("rows", false), NUM_ENTRIES_SCANNED_IN_FILTER("entries", false), NUM_ENTRIES_SCANNED_POST_FILTER("entries", false), - NUM_SEGMENTS_SEARCHED("numSegmentsSearched", false), + NUM_SEGMENTS_QUERIED("numSegmentsQueried", false), + NUM_SEGMENTS_PROCESSED("numSegmentsProcessed", false), + NUM_SEGMENTS_MATCHED("numSegmentsMatched", false), NUM_MISSING_SEGMENTS("segments", false); private final String meterName; diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/com/linkedin/pinot/common/response/BrokerResponse.java index 2b255ea..a2b4f81 100644 --- a/pinot-common/src/main/java/com/linkedin/pinot/common/response/BrokerResponse.java +++ b/pinot-common/src/main/java/com/linkedin/pinot/common/response/BrokerResponse.java @@ -87,6 +87,21 @@ public interface BrokerResponse { long getNumEntriesScannedPostFilter(); /** + * Get the number of segments queried by the broker after broker side pruning + */ + long getNumSegmentsQueried(); + + /** + * Get the number of segments processed by server after server side pruning + */ + long getNumSegmentsProcessed(); + + /** + * Get number of segments that had at least one matching document + */ + long getNumSegmentsMatched(); + + /** * Get total number of documents within the table hit. */ long getTotalDocs(); diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/com/linkedin/pinot/common/response/broker/BrokerResponseNative.java index e4a251c..1acbb79 100644 --- a/pinot-common/src/main/java/com/linkedin/pinot/common/response/broker/BrokerResponseNative.java +++ b/pinot-common/src/main/java/com/linkedin/pinot/common/response/broker/BrokerResponseNative.java @@ -39,9 +39,9 @@ import org.json.JSONObject; * * Supports serialization via JSON. */ -@JsonPropertyOrder({"selectionResults", "aggregationResults", "exceptions", "numServersQueried", "numServersResponded", - "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", "totalDocs", "numGroupsLimitReached", - "timeUsedMs", "segmentStatistics", "traceInfo"}) +@JsonPropertyOrder({ "selectionResults", "aggregationResults", "exceptions", "numServersQueried", "numServersResponded", "numSegmentsQueried", + "numSegmentsProcessed", "numSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numGroupsLimitReached", + "totalDocs", "timeUsedMs", "segmentStatistics", "traceInfo" }) public class BrokerResponseNative implements BrokerResponse { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -54,6 +54,10 @@ public class BrokerResponseNative implements BrokerResponse { private long _numDocsScanned = 0L; private long _numEntriesScannedInFilter = 0L; private long _numEntriesScannedPostFilter = 0L; + private long _numSegmentsQueried = 0L; + private long _numSegmentsProcessed = 0L; + private long _numSegmentsMatched = 0L; + private long _totalDocs = 0L; private boolean _numGroupsLimitReached = false; private long _timeUsedMs = 0L; @@ -173,6 +177,36 @@ public class BrokerResponseNative implements BrokerResponse { _numEntriesScannedPostFilter = numEntriesScannedPostFilter; } + @JsonProperty("numSegmentsQueried") + public long getNumSegmentsQueried() { + return _numSegmentsQueried; + } + + @JsonProperty("numSegmentsQueried") + public void setNumSegmentsQueried(long numSegmentsQueried) { + _numSegmentsQueried = numSegmentsQueried; + } + + @JsonProperty("numSegmentsProcessed") + public long getNumSegmentsProcessed() { + return _numSegmentsProcessed; + } + + @JsonProperty("numSegmentsProcessed") + public void setNumSegmentsProcessed(long numSegmentsProcessed) { + _numSegmentsProcessed = numSegmentsProcessed; + } + + @JsonProperty("numSegmentsMatched") + public long getNumSegmentsMatched() { + return _numSegmentsMatched; + } + + @JsonProperty("numSegmentsMatched") + public void setNumSegmentsMatched(long numSegmentsMatched) { + _numSegmentsMatched = numSegmentsMatched; + } + @JsonProperty("totalDocs") @Override public long getTotalDocs() { @@ -262,4 +296,6 @@ public class BrokerResponseNative implements BrokerResponse { public int getExceptionsSize() { return _processingExceptions.size(); } + + } diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/DataTable.java b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/DataTable.java index caaa397..53ac4c8 100644 --- a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/DataTable.java +++ b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/DataTable.java @@ -29,6 +29,9 @@ public interface DataTable { String NUM_DOCS_SCANNED_METADATA_KEY = "numDocsScanned"; String NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY = "numEntriesScannedInFilter"; String NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY = "numEntriesScannedPostFilter"; + String NUM_SEGMENTS_QUERIED = "numSegmentsQueried"; + String NUM_SEGMENTS_PROCESSED = "numSegmentsProcessed"; + String NUM_SEGMENTS_MATCHED = "numSegmentsMatched"; String TOTAL_DOCS_METADATA_KEY = "totalDocs"; String NUM_GROUPS_LIMIT_REACHED_KEY = "numGroupsLimitReached"; String TIME_USED_MS_METADATA_KEY = "timeUsedMs"; diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/operator/CombineGroupByOperator.java b/pinot-core/src/main/java/com/linkedin/pinot/core/operator/CombineGroupByOperator.java index 023b7eb..6ffc032 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/operator/CombineGroupByOperator.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/operator/CombineGroupByOperator.java @@ -192,6 +192,8 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc mergedBlock.setNumDocsScanned(executionStatistics.getNumDocsScanned()); mergedBlock.setNumEntriesScannedInFilter(executionStatistics.getNumEntriesScannedInFilter()); mergedBlock.setNumEntriesScannedPostFilter(executionStatistics.getNumEntriesScannedPostFilter()); + mergedBlock.setNumSegmentsProcessed(executionStatistics.getNumSegmentsProcessed()); + mergedBlock.setNumSegmentsMatched(executionStatistics.getNumSegmentsMatched()); mergedBlock.setNumTotalRawDocs(executionStatistics.getNumTotalRawDocs()); // NOTE: numGroups might go slightly over numGroupsLimit because the comparison is not atomic if (numGroups.get() >= _numGroupsLimit) { diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/operator/CombineOperator.java b/pinot-core/src/main/java/com/linkedin/pinot/core/operator/CombineOperator.java index 107a687..549cba7 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/operator/CombineOperator.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/operator/CombineOperator.java @@ -187,6 +187,8 @@ public class CombineOperator extends BaseOperator<IntermediateResultsBlock> { mergedBlock.setNumEntriesScannedInFilter(executionStatistics.getNumEntriesScannedInFilter()); mergedBlock.setNumEntriesScannedPostFilter(executionStatistics.getNumEntriesScannedPostFilter()); mergedBlock.setNumTotalRawDocs(executionStatistics.getNumTotalRawDocs()); + mergedBlock.setNumSegmentsProcessed(executionStatistics.getNumSegmentsProcessed()); + mergedBlock.setNumSegmentsMatched(executionStatistics.getNumSegmentsMatched()); return mergedBlock; } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/operator/ExecutionStatistics.java b/pinot-core/src/main/java/com/linkedin/pinot/core/operator/ExecutionStatistics.java index 2a1d9eb..c168d89 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/operator/ExecutionStatistics.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/operator/ExecutionStatistics.java @@ -23,7 +23,9 @@ public class ExecutionStatistics { private long _numEntriesScannedInFilter; private long _numEntriesScannedPostFilter; private long _numTotalRawDocs; - + private long _numSegmentsProcessed; + private long _numSegmentsMatched; + public ExecutionStatistics() { } @@ -33,6 +35,8 @@ public class ExecutionStatistics { _numEntriesScannedInFilter = numEntriesScannedInFilter; _numEntriesScannedPostFilter = numEntriesScannedPostFilter; _numTotalRawDocs = numTotalRawDocs; + _numSegmentsProcessed = 1; + _numSegmentsMatched = (numDocsScanned == 0) ? 0 : 1; } public long getNumDocsScanned() { @@ -51,6 +55,14 @@ public class ExecutionStatistics { return _numTotalRawDocs; } + public long getNumSegmentsProcessed() { + return _numSegmentsProcessed; + } + + public long getNumSegmentsMatched() { + return _numSegmentsMatched; + } + /** * Merge another execution statistics into the current one. * @@ -61,6 +73,8 @@ public class ExecutionStatistics { _numEntriesScannedInFilter += executionStatisticsToMerge._numEntriesScannedInFilter; _numEntriesScannedPostFilter += executionStatisticsToMerge._numEntriesScannedPostFilter; _numTotalRawDocs += executionStatisticsToMerge._numTotalRawDocs; + _numSegmentsProcessed += executionStatisticsToMerge._numSegmentsProcessed; + _numSegmentsMatched += executionStatisticsToMerge._numSegmentsMatched; } @Override @@ -69,6 +83,8 @@ public class ExecutionStatistics { + "\n numDocsScanned: " + _numDocsScanned + "\n numEntriesScannedInFilter: " + _numEntriesScannedInFilter + "\n numEntriesScannedPostFilter: " + _numEntriesScannedPostFilter - + "\n numTotalRawDocs: " + _numTotalRawDocs; + + "\n numTotalRawDocs: " + _numTotalRawDocs + + "\n numSegmentsProcessed: " + _numSegmentsProcessed + + "\n numSegmentsMatched: " + _numSegmentsMatched; } } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/operator/blocks/IntermediateResultsBlock.java b/pinot-core/src/main/java/com/linkedin/pinot/core/operator/blocks/IntermediateResultsBlock.java index 80da626..0b8b1eb 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/operator/blocks/IntermediateResultsBlock.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/operator/blocks/IntermediateResultsBlock.java @@ -53,6 +53,8 @@ public class IntermediateResultsBlock implements Block { private long _numEntriesScannedInFilter; private long _numEntriesScannedPostFilter; private long _numTotalRawDocs; + private long _numSegmentsProcessed; + private long _numSegmentsMatched; private boolean _numGroupsLimitReached; /** @@ -172,6 +174,22 @@ public class IntermediateResultsBlock implements Block { public void setNumEntriesScannedPostFilter(long numEntriesScannedPostFilter) { _numEntriesScannedPostFilter = numEntriesScannedPostFilter; } + + public long getNumSegmentsProcessed() { + return _numSegmentsProcessed; + } + + public void setNumSegmentsProcessed(long numSegmentsProcessed) { + _numSegmentsProcessed = numSegmentsProcessed; + } + + public long getNumSegmentsMatched() { + return _numSegmentsMatched; + } + + public void setNumSegmentsMatched(long numSegmentsMatched) { + _numSegmentsMatched = numSegmentsMatched; + } public void setNumTotalRawDocs(long numTotalRawDocs) { _numTotalRawDocs = numTotalRawDocs; @@ -279,6 +297,9 @@ public class IntermediateResultsBlock implements Block { .put(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY, String.valueOf(_numEntriesScannedInFilter)); dataTable.getMetadata() .put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY, String.valueOf(_numEntriesScannedPostFilter)); + dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_PROCESSED, String.valueOf(_numSegmentsProcessed)); + dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_MATCHED, String.valueOf(_numSegmentsMatched)); + dataTable.getMetadata().put(DataTable.TOTAL_DOCS_METADATA_KEY, String.valueOf(_numTotalRawDocs)); if (_numGroupsLimitReached) { dataTable.getMetadata().put(DataTable.NUM_GROUPS_LIMIT_REACHED_KEY, "true"); diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/com/linkedin/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index 2acb3f9..1e8ef70 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -131,17 +131,17 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { TimerContext.Timer segmentPruneTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING); long totalRawDocs = pruneSegments(tableDataManager, segmentDataManagers, queryRequest); segmentPruneTimer.stopAndRecord(); - - int numSegmentsMatched = segmentDataManagers.size(); - queryRequest.setSegmentCountAfterPruning(numSegmentsMatched); - LOGGER.debug("Matched {} segments", numSegmentsMatched); - if (numSegmentsMatched == 0) { + int numSegmentsMatchedAfterPruning = segmentDataManagers.size(); + LOGGER.debug("Matched {} segments after pruning", numSegmentsMatchedAfterPruning); + if (numSegmentsMatchedAfterPruning == 0) { dataTable = DataTableBuilder.buildEmptyDataTable(brokerRequest); Map<String, String> metadata = dataTable.getMetadata(); metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY, String.valueOf(totalRawDocs)); metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0"); metadata.put(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY, "0"); metadata.put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY, "0"); + metadata.put(DataTable.NUM_SEGMENTS_PROCESSED, "0"); + metadata.put(DataTable.NUM_SEGMENTS_MATCHED, "0"); } else { TimerContext.Timer planBuildTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN); Plan globalQueryPlan = @@ -188,6 +188,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { queryProcessingTimer.stopAndRecord(); long queryProcessingTime = queryProcessingTimer.getDurationMs(); + dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_QUERIED, Long.toString(segmentDataManagers.size())); dataTable.getMetadata().put(DataTable.TIME_USED_MS_METADATA_KEY, Long.toString(queryProcessingTime)); LOGGER.debug("Query processing time for request Id - {}: {}", requestId, queryProcessingTime); LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId, dataTable); diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/com/linkedin/pinot/core/query/reduce/BrokerReduceService.java index 1c43b40..e52959d 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/query/reduce/BrokerReduceService.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/query/reduce/BrokerReduceService.java @@ -76,6 +76,9 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative> long numDocsScanned = 0L; long numEntriesScannedInFilter = 0L; long numEntriesScannedPostFilter = 0L; + long numSegmentsQueried = 0L; + long numSegmentsProcessed = 0L; + long numSegmentsMatched = 0L; long numTotalRawDocs = 0L; boolean numGroupsLimitReached = false; @@ -116,6 +119,20 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative> if (numEntriesScannedPostFilterString != null) { numEntriesScannedPostFilter += Long.parseLong(numEntriesScannedPostFilterString); } + String numSegmentsQueriedString = metadata.get(DataTable.NUM_SEGMENTS_QUERIED); + if (numSegmentsQueriedString != null) { + numSegmentsQueried += Long.parseLong(numSegmentsQueriedString); + } + + String numSegmentsProcessedString = metadata.get(DataTable.NUM_SEGMENTS_PROCESSED); + if (numSegmentsProcessedString != null) { + numSegmentsProcessed += Long.parseLong(numSegmentsProcessedString); + } + String numSegmentsMatchedString = metadata.get(DataTable.NUM_SEGMENTS_MATCHED); + if (numSegmentsMatchedString != null) { + numSegmentsMatched += Long.parseLong(numSegmentsMatchedString); + } + String numTotalRawDocsString = metadata.get(DataTable.TOTAL_DOCS_METADATA_KEY); if (numTotalRawDocsString != null) { numTotalRawDocs += Long.parseLong(numTotalRawDocsString); @@ -143,6 +160,9 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative> brokerResponseNative.setNumDocsScanned(numDocsScanned); brokerResponseNative.setNumEntriesScannedInFilter(numEntriesScannedInFilter); brokerResponseNative.setNumEntriesScannedPostFilter(numEntriesScannedPostFilter); + brokerResponseNative.setNumSegmentsQueried(numSegmentsQueried); + brokerResponseNative.setNumSegmentsProcessed(numSegmentsProcessed); + brokerResponseNative.setNumSegmentsMatched(numSegmentsMatched); brokerResponseNative.setTotalDocs(numTotalRawDocs); brokerResponseNative.setNumGroupsLimitReached(numGroupsLimitReached); diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/query/request/ServerQueryRequest.java b/pinot-core/src/main/java/com/linkedin/pinot/core/query/request/ServerQueryRequest.java index 1fad121..aff30fc 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/query/request/ServerQueryRequest.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/query/request/ServerQueryRequest.java @@ -191,12 +191,4 @@ public class ServerQueryRequest { public Set<String> getSelectionColumns() { return _selectionColumns; } - - public int getSegmentCountAfterPruning() { - return _segmentCountAfterPruning; - } - - public void setSegmentCountAfterPruning(int segmentCountAfterPruning) { - _segmentCountAfterPruning = segmentCountAfterPruning; - } } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/query/scheduler/QueryScheduler.java index 1edb97c..7f906c5 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/query/scheduler/QueryScheduler.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/query/scheduler/QueryScheduler.java @@ -46,7 +46,8 @@ import org.slf4j.LoggerFactory; public abstract class QueryScheduler { private static final Logger LOGGER = LoggerFactory.getLogger(QueryScheduler.class); private static final String INVALID_NUM_SCANNED = "-1"; - + private static final String INVALID_SEGMENTS_COUNT = "-1"; + protected final ServerMetrics serverMetrics; protected final QueryExecutor queryExecutor; protected final ResourceManager resourceManager; @@ -146,7 +147,12 @@ public abstract class QueryScheduler { long numEntriesScannedInFilter = Long.parseLong( dataTableMetadata.getOrDefault(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY, INVALID_NUM_SCANNED)); long numEntriesScannedPostFilter = Long.parseLong( - dataTableMetadata.getOrDefault(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY, INVALID_NUM_SCANNED)); + dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_QUERIED, INVALID_NUM_SCANNED)); + long numSegmentsProcessed = Long.parseLong( + dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_PROCESSED, INVALID_SEGMENTS_COUNT)); + long numSegmentsMatched = Long.parseLong( + dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_MATCHED, INVALID_SEGMENTS_COUNT)); + if (numDocsScanned > 0) { serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_DOCS_SCANNED, numDocsScanned); } @@ -160,14 +166,17 @@ public abstract class QueryScheduler { } TimerContext timerContext = queryRequest.getTimerContext(); + int numSegmentsQueried = queryRequest.getSegmentsToQuery().size(); LOGGER.info( - "Processed requestId={},table={},reqSegments={},prunedToSegmentCount={},totalExecMs={},totalTimeMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}", - requestId, tableNameWithType, queryRequest.getSegmentsToQuery().size(), - queryRequest.getSegmentCountAfterPruning(), timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING), + "Processed requestId={},table={},Segments(Queried/processed/matched)={}/{}/{},totalExecMs={},totalTimeMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}", + requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, + timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING), timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), queryRequest.getBrokerId(), numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name()); - serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_SEARCHED, - queryRequest.getSegmentCountAfterPruning()); + + serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried); + serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PROCESSED, numSegmentsProcessed); + serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_MATCHED, numSegmentsMatched); return responseData; } diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/BaseClusterIntegrationTestSet.java index a175fce..ec92ce6 100644 --- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/BaseClusterIntegrationTestSet.java +++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/BaseClusterIntegrationTestSet.java @@ -39,6 +39,7 @@ import org.apache.helix.model.InstanceConfig; import org.json.JSONArray; import org.json.JSONObject; import org.testng.Assert; +import org.testng.annotations.Test; /** @@ -136,7 +137,27 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati + "FlightDate IN ('2014-12-09', '2014-10-05') GROUP BY ActualElapsedTime, OriginStateFips " + "HAVING SUM(ArrDelay) <> 6325.973 AND AVG(CAST(CRSDepTime AS DOUBLE)) <= 1569.8755 OR SUM(TaxiIn) = 1003.87274")); } - + /** + * Test to ensure that broker response contains expected stats + * + * @throws Exception + */ + public void testBrokerResponseMetadata() throws Exception { + String[] pqlQueries = new String[] { // + "SELECT count(*) FROM mytable", // matching query + "SELECT count(*) FROM mytable where non_existing_column='non_existing_value", // query that does not match any row + "SELECT count(*) FROM mytable_foo" // query a non existing table + }; + String[] statNames = new String[] { "totalDocs", "numServersQueried", "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", + "numSegmentsMatched", "numDocsScanned", "totalDocs", "timeUsedMs", "numEntriesScannedInFilter", "numEntriesScannedPostFilter" }; + + for (String query : pqlQueries) { + JSONObject response = postQuery(query); + for (String statName : statNames) { + Assert.assertTrue(response.has(statName)); + } + } + } public void testVirtualColumnQueries() { // Check that there are no virtual columns in the query results ResultSetGroup resultSetGroup = getPinotConnection().execute("select * from mytable"); diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTest.java index 17ae902..023cae3 100644 --- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTest.java @@ -251,6 +251,12 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet @Test @Override + public void testBrokerResponseMetadata() throws Exception { + super.testBrokerResponseMetadata(); + } + + @Test + @Override public void testVirtualColumnQueries() { super.testVirtualColumnQueries(); } diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java index 40f7ba7..383c95b 100644 --- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -358,6 +358,12 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet } @Test + @Override + public void testBrokerResponseMetadata() throws Exception { + super.testBrokerResponseMetadata(); + } + + @Test public void testUDF() throws Exception { String pqlQuery = "SELECT COUNT(*) FROM mytable GROUP BY timeConvert(DaysSinceEpoch,'DAYS','SECONDS')"; JSONObject response = postQuery(pqlQuery); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org