This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new de16a0a35d specify how many segments were pruned by each server segment pruner (#8884) de16a0a35d is described below commit de16a0a35d93f3fe0393df563015e7dd1298ceb9 Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com> AuthorDate: Tue Jul 5 16:50:23 2022 +0200 specify how many segments were pruned by each server segment pruner (#8884) * specify how many segments were pruned by each server segment pruner * force the query log when there are invalid segments * change header to javadoc * Add a warning section in Controller UI query view. Initially there is only one warning, which is shown when segments were pruned because they were invalid * Correct an assertion * Apply suggestions from code review Co-authored-by: Rong Rong <walterddr.walter...@gmail.com> * Don't report empty segments as invalid * Add test on SegmentPrunerService * Update pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java Co-authored-by: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> * Move new literals to the end of the enum list * Correctly apply suggested change * remove trailing whitespace * fix a compilation error Co-authored-by: Rong Rong <ro...@apache.org> Co-authored-by: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> --- .../apache/pinot/common/metrics/ServerMeter.java | 6 +- .../pinot/common/response/BrokerResponse.java | 42 ++++++++ .../response/broker/BrokerResponseNative.java | 39 +++++++ .../org/apache/pinot/common/utils/DataTable.java | 3 + .../src/main/resources/app/pages/Query.tsx | 21 ++++ .../core/query/config/SegmentPrunerConfig.java | 2 +- .../query/executor/ServerQueryExecutorV1Impl.java | 17 ++- .../pinot/core/query/pruner/SegmentPruner.java | 2 + .../core/query/pruner/SegmentPrunerProvider.java | 7 +- .../core/query/pruner/SegmentPrunerService.java | 76 +++++++++++-- ...entPruner.java => SegmentPrunerStatistics.java} | 55 +++++----- .../pinot/core/query/reduce/BaseReduceService.java | 23 +++- .../pinot/core/query/scheduler/QueryScheduler.java | 30 +++++- .../query/pruner/SegmentPrunerServiceTest.java | 117 +++++++++++++++++++++ 14 files changed, 394 insertions(+), 46 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 0cbd12c537..776a4e1aae 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -75,7 +75,11 @@ public enum ServerMeter implements AbstractMetrics.Meter { // Netty connection metrics NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true), NETTY_CONNECTION_RESPONSES_SENT("nettyConnection", true), - NETTY_CONNECTION_BYTES_SENT("nettyConnection", true); + NETTY_CONNECTION_BYTES_SENT("nettyConnection", true), + + NUM_SEGMENTS_PRUNED_INVALID("numSegmentsPrunedInvalid", false), + NUM_SEGMENTS_PRUNED_BY_LIMIT("numSegmentsPrunedByLimit", false), + NUM_SEGMENTS_PRUNED_BY_VALUE("numSegmentsPrunedByValue", false),; private final String _meterName; private final String _unit; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java index aaddaaa9cb..7e269f6421 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java @@ -241,6 +241,48 @@ public interface BrokerResponse { */ void setNumSegmentsPrunedByServer(long numSegmentsPrunedByServer); + /** + * Get the total number of segments pruned due to invalid data or schema. + * + * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()} + */ + long getNumSegmentsPrunedInvalid(); + + /** + * Set the total number of segments pruned due to invalid data or schema. + * + * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()} + */ + void setNumSegmentsPrunedInvalid(long numSegmentsPrunedInvalid); + + /** + * Get the total number of segments pruned by applying the limit optimization. + * + * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()} + */ + long getNumSegmentsPrunedByLimit(); + + /** + * Set the total number of segments pruned by applying the limit optimization. + * + * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()} + */ + void setNumSegmentsPrunedByLimit(long numSegmentsPrunedByLimit); + + /** + * Get the total number of segments pruned applying value optimizations, like bloom filters. + * + * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()} + */ + long getNumSegmentsPrunedByValue(); + + /** + * Set the total number of segments pruned applying value optimizations, like bloom filters. + * + * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()} + */ + void setNumSegmentsPrunedByValue(long numSegmentsPrunedByValue); + /** * Get the total number of segments with an EmptyFilterOperator when Explain Plan is called */ diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java index a8fdd7a9d5..d9df0dc67e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java @@ -83,6 +83,9 @@ public class BrokerResponseNative implements BrokerResponse { private long _realtimeTotalCpuTimeNs = 0L; private long _numSegmentsPrunedByBroker = 0L; private long _numSegmentsPrunedByServer = 0L; + private long _numSegmentsPrunedInvalid = 0L; + private long _numSegmentsPrunedByLimit = 0L; + private long _numSegmentsPrunedByValue = 0L; private long _explainPlanNumEmptyFilterSegments = 0L; private long _explainPlanNumMatchAllFilterSegments = 0L; private int _numRowsResultSet = 0; @@ -245,6 +248,42 @@ public class BrokerResponseNative implements BrokerResponse { _numSegmentsPrunedByServer = numSegmentsPrunedByServer; } + @JsonProperty("numSegmentsPrunedInvalid") + @Override + public long getNumSegmentsPrunedInvalid() { + return _numSegmentsPrunedInvalid; + } + + @JsonProperty("numSegmentsPrunedInvalid") + @Override + public void setNumSegmentsPrunedInvalid(long numSegmentsPrunedInvalid) { + _numSegmentsPrunedInvalid = numSegmentsPrunedInvalid; + } + + @JsonProperty("numSegmentsPrunedByLimit") + @Override + public long getNumSegmentsPrunedByLimit() { + return _numSegmentsPrunedByLimit; + } + + @JsonProperty("numSegmentsPrunedByLimit") + @Override + public void setNumSegmentsPrunedByLimit(long numSegmentsPrunedByLimit) { + _numSegmentsPrunedByLimit = numSegmentsPrunedByLimit; + } + + @JsonProperty("numSegmentsPrunedByValue") + @Override + public long getNumSegmentsPrunedByValue() { + return _numSegmentsPrunedByValue; + } + + @JsonProperty("numSegmentsPrunedByValue") + @Override + public void setNumSegmentsPrunedByValue(long numSegmentsPrunedByValue) { + _numSegmentsPrunedByValue = numSegmentsPrunedByValue; + } + @JsonProperty("explainPlanNumEmptyFilterSegments") @Override public long getExplainPlanNumEmptyFilterSegments() { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java index c01af3ab11..c27565b0ec 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java @@ -116,6 +116,9 @@ public interface DataTable { SYSTEM_ACTIVITIES_CPU_TIME_NS("systemActivitiesCpuTimeNs", MetadataValueType.LONG), RESPONSE_SER_CPU_TIME_NS("responseSerializationCpuTimeNs", MetadataValueType.LONG), NUM_SEGMENTS_PRUNED_BY_SERVER("numSegmentsPrunedByServer", MetadataValueType.INT), + NUM_SEGMENTS_PRUNED_INVALID("numSegmentsPrunedByInvalid", MetadataValueType.INT), + NUM_SEGMENTS_PRUNED_BY_LIMIT("numSegmentsPrunedByLimit", MetadataValueType.INT), + NUM_SEGMENTS_PRUNED_BY_VALUE("numSegmentsPrunedByValue", MetadataValueType.INT), EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS("explainPlanNumEmptyFilterSegments", MetadataValueType.INT), EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS("explainPlanNumMatchAllFilterSegments", MetadataValueType.INT); diff --git a/pinot-controller/src/main/resources/app/pages/Query.tsx b/pinot-controller/src/main/resources/app/pages/Query.tsx index 79747be1cf..99c691f8e9 100644 --- a/pinot-controller/src/main/resources/app/pages/Query.tsx +++ b/pinot-controller/src/main/resources/app/pages/Query.tsx @@ -190,6 +190,8 @@ const QueryPage = () => { records: [], }); + const [warnings, setWarnings] = useState<Array<string>>([]); + const [checked, setChecked] = React.useState({ tracing: queryParam.get('tracing') === 'true', showResultJSON: false, @@ -295,10 +297,22 @@ const QueryPage = () => { setResultData(results.result || { columns: [], records: [] }); setQueryStats(results.queryStats || { columns: responseStatCols, records: [] }); setOutputResult(JSON.stringify(results.data, null, 2) || ''); + setWarnings(extractWarnings(results)); setQueryLoader(false); queryExecuted.current = false; }; + const extractWarnings = (result) => { + const warnings: Array<string> = []; + const numSegmentsPrunedInvalid = result.data.numSegmentsPrunedInvalid; + if (numSegmentsPrunedInvalid) { + warnings.push(`There are ${numSegmentsPrunedInvalid} invalid segment/s. This usually means that they were ` + + `created with an older schema. ` + + `Please reload the table in order to refresh these segments to the new schema.`); + } + return warnings; + } + const fetchSQLData = async (tableName) => { setQueryLoader(true); const result = await PinotMethodUtils.getTableSchemaData(tableName); @@ -517,6 +531,13 @@ const QueryPage = () => { </Alert> ) : ( <> + { + warnings.map(warn => + <Alert severity="warning" className={classes.sqlError}> + {warn} + </Alert> + ) + } <Grid item xs style={{ backgroundColor: 'white' }}> {resultData.columns.length ? ( <> diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/config/SegmentPrunerConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/query/config/SegmentPrunerConfig.java index 98c6c3ee0c..604bf10801 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/config/SegmentPrunerConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/config/SegmentPrunerConfig.java @@ -28,7 +28,7 @@ import org.apache.pinot.spi.env.PinotConfiguration; * Config for SegmentPruner. */ public class SegmentPrunerConfig { - private static final String SEGMENT_PRUNER_NAMES_KEY = "class"; + public static final String SEGMENT_PRUNER_NAMES_KEY = "class"; private final int _numSegmentPruners; private final List<String> _segmentPrunerNames; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index e0c086f5c3..851e5f21b1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -58,6 +58,7 @@ import org.apache.pinot.core.plan.maker.PlanMaker; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.config.QueryExecutorConfig; import org.apache.pinot.core.query.pruner.SegmentPrunerService; +import org.apache.pinot.core.query.pruner.SegmentPrunerStatistics; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.TimerContext; @@ -289,7 +290,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { TimerContext.Timer segmentPruneTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING); int totalSegments = indexSegments.size(); - List<IndexSegment> selectedSegments = _segmentPrunerService.prune(indexSegments, queryContext); + SegmentPrunerStatistics prunerStats = new SegmentPrunerStatistics(); + List<IndexSegment> selectedSegments = _segmentPrunerService.prune(indexSegments, queryContext, prunerStats); segmentPruneTimer.stopAndRecord(); int numSelectedSegments = selectedSegments.size(); LOGGER.debug("Matched {} segments after pruning", numSelectedSegments); @@ -310,6 +312,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { metadata.put(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), "0"); metadata.put(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), "0"); metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(), String.valueOf(totalSegments)); + addPrunerStats(metadata, prunerStats); return dataTable; } else { TimerContext.Timer planBuildTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN); @@ -322,12 +325,14 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { DataTable dataTable = queryContext.isExplain() ? processExplainPlanQueries(queryPlan) : queryPlan.execute(); planExecTimer.stopAndRecord(); + Map<String, String> metadata = dataTable.getMetadata(); // Update the total docs in the metadata based on the un-pruned segments - dataTable.getMetadata().put(MetadataKey.TOTAL_DOCS.getName(), Long.toString(numTotalDocs)); + metadata.put(MetadataKey.TOTAL_DOCS.getName(), Long.toString(numTotalDocs)); // Set the number of pruned segments. This count does not include the segments which returned empty filters int prunedSegments = totalSegments - numSelectedSegments; - dataTable.getMetadata().put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(), String.valueOf(prunedSegments)); + metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(), String.valueOf(prunedSegments)); + addPrunerStats(metadata, prunerStats); return dataTable; } @@ -575,4 +580,10 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { } } } + + private void addPrunerStats(Map<String, String> metadata, SegmentPrunerStatistics prunerStats) { + metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), String.valueOf(prunerStats.getInvalidSegments())); + metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), String.valueOf(prunerStats.getLimitPruned())); + metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), String.valueOf(prunerStats.getValuePruned())); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java index 2d06bd2df6..7953db71e5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java @@ -40,6 +40,8 @@ public interface SegmentPruner { /** * Prunes the segments based on the query, returns the segments that are not pruned. * <p>Override this method for the pruner logic. + * + * @param segments The list of segments to be pruned. Implementations must not modify the list. */ List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java index dd392d15e7..e09d54590d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java @@ -35,9 +35,12 @@ public class SegmentPrunerProvider { private static final Map<String, Class<? extends SegmentPruner>> PRUNER_MAP = new HashMap<>(); + public static final String COLUMN_VALUE_SEGMENT_PRUNER_NAME = "columnvaluesegmentpruner"; + public static final String SELECTION_QUERY_SEGMENT_PRUNER_NAME = "selectionquerysegmentpruner"; + static { - PRUNER_MAP.put("columnvaluesegmentpruner", ColumnValueSegmentPruner.class); - PRUNER_MAP.put("selectionquerysegmentpruner", SelectionQuerySegmentPruner.class); + PRUNER_MAP.put(COLUMN_VALUE_SEGMENT_PRUNER_NAME, ColumnValueSegmentPruner.class); + PRUNER_MAP.put(SELECTION_QUERY_SEGMENT_PRUNER_NAME, SelectionQuerySegmentPruner.class); } @Nullable diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java index a53191e4b7..5c79ca1c2d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java @@ -19,7 +19,10 @@ package org.apache.pinot.core.query.pruner; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; import org.apache.pinot.core.query.config.SegmentPrunerConfig; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.IndexSegment; @@ -37,9 +40,11 @@ public class SegmentPrunerService { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPrunerService.class); private final List<SegmentPruner> _segmentPruners; + private final Map<SegmentPruner, BiConsumer<SegmentPrunerStatistics, Integer>> _prunerStatsUpdaters; public SegmentPrunerService(SegmentPrunerConfig config) { int numPruners = config.numSegmentPruners(); + _prunerStatsUpdaters = new HashMap<>(); _segmentPruners = new ArrayList<>(numPruners); for (int i = 0; i < numPruners; i++) { @@ -49,25 +54,59 @@ public class SegmentPrunerService { config.getSegmentPrunerConfig(i)); if (pruner != null) { _segmentPruners.add(pruner); + switch (segmentPrunerName.toLowerCase()) { + case SegmentPrunerProvider.SELECTION_QUERY_SEGMENT_PRUNER_NAME: + _prunerStatsUpdaters.put(pruner, SegmentPrunerStatistics::setLimitPruned); + break; + case SegmentPrunerProvider.COLUMN_VALUE_SEGMENT_PRUNER_NAME: + _prunerStatsUpdaters.put(pruner, SegmentPrunerStatistics::setValuePruned); + break; + default: + _prunerStatsUpdaters.put(pruner, (stats, value) -> { }); + break; + } } else { LOGGER.warn("could not create segment pruner: {}", segmentPrunerName); } } + assert _segmentPruners.stream() + .allMatch(_prunerStatsUpdaters::containsKey) + : "No defined stats updater for pruner " + _segmentPruners.stream() + .filter(p -> !_prunerStatsUpdaters.containsKey(p)) + .findAny().orElseThrow(IllegalStateException::new); } /** * Prunes the segments based on the query request, returns the segments that are not pruned. + * + * @deprecated this method is here for compatibility reasons and may be removed soon. + * Call {@link #prune(List, QueryContext, SegmentPrunerStatistics)} instead + * @param segments the list of segments to be pruned. This is a destructive operation that may modify this list in an + * undefined way. Therefore, this list should not be used after calling this method. */ + @Deprecated public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query) { + return prune(segments, query, new SegmentPrunerStatistics()); + } + + /** + * Prunes the segments based on the query request, returns the segments that are not pruned. + * + * @param segments the list of segments to be pruned. This is a destructive operation that may modify this list in an + * undefined way. Therefore, this list should not be used after calling this method. + */ + public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query, SegmentPrunerStatistics stats) { try (InvocationScope scope = Tracing.getTracer().createScope(SegmentPrunerService.class)) { - segments = removeInvalidSegments(segments, query); + segments = removeInvalidSegments(segments, query, stats); int invokedPrunersCount = 0; for (SegmentPruner segmentPruner : _segmentPruners) { if (segmentPruner.isApplicableTo(query)) { invokedPrunersCount++; try (InvocationScope prunerScope = Tracing.getTracer().createScope(segmentPruner.getClass())) { - prunerScope.setNumSegments(segments.size()); + int originalSegmentsSize = segments.size(); + prunerScope.setNumSegments(originalSegmentsSize); segments = segmentPruner.prune(segments, query); + _prunerStatsUpdaters.get(segmentPruner).accept(stats, originalSegmentsSize - segments.size()); } } } @@ -76,18 +115,41 @@ public class SegmentPrunerService { return segments; } - private static List<IndexSegment> removeInvalidSegments(List<IndexSegment> segments, QueryContext query) { + /** + * Filters the given list, returning a list that only contains the valid segments, modifying the list received as + * argument. + * + * <p> + * This is a destructive operation. The list received as arguments may be modified, so only the returned list should + * be used. + * </p> + * + * @param segments the list of segments to be pruned. This is a destructive operation that may modify this list in an + * undefined way. Therefore, this list should not be used after calling this method. + * @return the new list with filtered elements. This is the list that have to be used. + */ + private static List<IndexSegment> removeInvalidSegments(List<IndexSegment> segments, QueryContext query, + SegmentPrunerStatistics stats) { int selected = 0; + int invalid = 0; for (IndexSegment segment : segments) { - if (!isInvalidSegment(segment, query)) { - segments.set(selected++, segment); + if (!isEmptySegment(segment)) { + if (isInvalidSegment(segment, query)) { + invalid++; + } else { + segments.set(selected++, segment); + } } } + stats.setInvalidSegments(invalid); return segments.subList(0, selected); } + private static boolean isEmptySegment(IndexSegment segment) { + return segment.getSegmentMetadata().getTotalDocs() == 0; + } + private static boolean isInvalidSegment(IndexSegment segment, QueryContext query) { - return segment.getSegmentMetadata().getTotalDocs() == 0 - || !segment.getColumnNames().containsAll(query.getColumns()); + return !segment.getColumnNames().containsAll(query.getColumns()); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerStatistics.java similarity index 54% copy from pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java copy to pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerStatistics.java index 2d06bd2df6..39d068cf80 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerStatistics.java @@ -18,28 +18,35 @@ */ package org.apache.pinot.core.query.pruner; -import java.util.List; -import org.apache.pinot.core.query.request.context.QueryContext; -import org.apache.pinot.segment.spi.IndexSegment; -import org.apache.pinot.spi.env.PinotConfiguration; - - -public interface SegmentPruner { - - /** - * Initializes the segment pruner. - */ - void init(PinotConfiguration config); - - /** - * Inspect the query context to determine if the pruner should be applied - * @return true if the pruner applies to the query - */ - boolean isApplicableTo(QueryContext query); - - /** - * Prunes the segments based on the query, returns the segments that are not pruned. - * <p>Override this method for the pruner logic. - */ - List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query); +public class SegmentPrunerStatistics { + + private int _invalidSegments; + + private int _valuePruned; + + private int _limitPruned; + + public int getInvalidSegments() { + return _invalidSegments; + } + + public void setInvalidSegments(int invalidSegments) { + _invalidSegments = invalidSegments; + } + + public int getValuePruned() { + return _valuePruned; + } + + public void setValuePruned(int valuePruned) { + _valuePruned = valuePruned; + } + + public int getLimitPruned() { + return _limitPruned; + } + + public void setLimitPruned(int limitPruned) { + _limitPruned = limitPruned; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java index 662c08cabc..dc3a450d50 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.function.LongConsumer; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; @@ -143,6 +144,9 @@ public abstract class BaseReduceService { private long _offlineTotalCpuTimeNs = 0L; private long _realtimeTotalCpuTimeNs = 0L; private long _numSegmentsPrunedByServer = 0L; + private long _numSegmentsPrunedInvalid = 0L; + private long _numSegmentsPrunedByLimit = 0L; + private long _numSegmentsPrunedByValue = 0L; private long _explainPlanNumEmptyFilterSegments = 0L; private long _explainPlanNumMatchAllFilterSegments = 0L; private boolean _numGroupsLimitReached = false; @@ -233,10 +237,11 @@ public abstract class BaseReduceService { _realtimeTotalCpuTimeNs = _realtimeThreadCpuTimeNs + _realtimeSystemActivitiesCpuTimeNs + _realtimeResponseSerializationCpuTimeNs; - String numSegmentsPrunedByServer = metadata.get(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName()); - if (numSegmentsPrunedByServer != null) { - _numSegmentsPrunedByServer += Long.parseLong(numSegmentsPrunedByServer); - } + withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER, + l -> _numSegmentsPrunedByServer += l); + withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_INVALID, l -> _numSegmentsPrunedInvalid += l); + withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT, l -> _numSegmentsPrunedByLimit += l); + withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE, l -> _numSegmentsPrunedByValue += l); String explainPlanNumEmptyFilterSegments = metadata.get(MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName()); @@ -286,6 +291,9 @@ public abstract class BaseReduceService { brokerResponseNative.setOfflineTotalCpuTimeNs(_offlineTotalCpuTimeNs); brokerResponseNative.setRealtimeTotalCpuTimeNs(_realtimeTotalCpuTimeNs); brokerResponseNative.setNumSegmentsPrunedByServer(_numSegmentsPrunedByServer); + brokerResponseNative.setNumSegmentsPrunedInvalid(_numSegmentsPrunedInvalid); + brokerResponseNative.setNumSegmentsPrunedByLimit(_numSegmentsPrunedByLimit); + brokerResponseNative.setNumSegmentsPrunedByValue(_numSegmentsPrunedByValue); brokerResponseNative.setExplainPlanNumEmptyFilterSegments(_explainPlanNumEmptyFilterSegments); brokerResponseNative.setExplainPlanNumMatchAllFilterSegments(_explainPlanNumMatchAllFilterSegments); if (_numConsumingSegmentsProcessed > 0) { @@ -324,5 +332,12 @@ public abstract class BaseReduceService { } } } + + private void withNotNullLongMetadata(Map<String, String> metadata, MetadataKey key, LongConsumer consumer) { + String strValue = metadata.get(key.getName()); + if (strValue != null) { + consumer.accept(Long.parseLong(strValue)); + } + } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java index 7ca4485ca4..e4344dd1bb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java @@ -175,6 +175,15 @@ public abstract class QueryScheduler { dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), INVALID_SEGMENTS_COUNT)); long numSegmentsMatched = Long.parseLong( dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), INVALID_SEGMENTS_COUNT)); + long numSegmentsPrunedInvalid = Long.parseLong( + dataTableMetadata.getOrDefault( + MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), INVALID_SEGMENTS_COUNT)); + long numSegmentsPrunedByLimit = Long.parseLong( + dataTableMetadata.getOrDefault( + MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), INVALID_SEGMENTS_COUNT)); + long numSegmentsPrunedByValue = Long.parseLong( + dataTableMetadata.getOrDefault( + MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), INVALID_SEGMENTS_COUNT)); long numSegmentsConsuming = Long.parseLong( dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), INVALID_SEGMENTS_COUNT)); long minConsumingFreshnessMs = Long.parseLong( @@ -231,12 +240,14 @@ public abstract class QueryScheduler { // Please keep the format as name=value comma-separated with no spaces // Please add new entries at the end - if (_queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned)) { - LOGGER.info("Processed requestId={},table={},segments(queried/processed/matched/consuming)={}/{}/{}/{}," + if (_queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned, numSegmentsPrunedInvalid)) { + LOGGER.info("Processed requestId={},table={}," + + "segments(queried/processed/matched/consuming/invalid/limit/value)={}/{}/{}/{}/{}/{}/{}," + "schedulerWaitMs={},reqDeserMs={},totalExecMs={},resSerMs={},totalTimeMs={},minConsumingFreshnessMs={}," + "broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}," + "threadCpuTimeNs(total/thread/sysActivity/resSer)={}/{}/{}/{}", requestId, tableNameWithType, - numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, numSegmentsConsuming, schedulerWaitMs, + numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, numSegmentsConsuming, + numSegmentsPrunedInvalid, numSegmentsPrunedByLimit, numSegmentsPrunedByValue, schedulerWaitMs, timerContext.getPhaseDurationMs(ServerQueryPhase.REQUEST_DESERIALIZATION), timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING), timerContext.getPhaseDurationMs(ServerQueryPhase.RESPONSE_SERIALIZATION), @@ -266,6 +277,12 @@ public abstract class QueryScheduler { _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried); _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PROCESSED, numSegmentsProcessed); _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_MATCHED, numSegmentsMatched); + _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PRUNED_INVALID, + numSegmentsPrunedInvalid); + _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PRUNED_BY_LIMIT, + numSegmentsPrunedByLimit); + _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PRUNED_BY_VALUE, + numSegmentsPrunedByValue); return responseBytes; } @@ -276,12 +293,17 @@ public abstract class QueryScheduler { * TODO: come up with other criteria for forcing a log and come up with better numbers * */ - private boolean forceLog(long schedulerWaitMs, long numDocsScanned) { + private boolean forceLog(long schedulerWaitMs, long numDocsScanned, long numSegmentsPrunedInvalid) { // If scheduler wait time is larger than 100ms, force the log if (schedulerWaitMs > 100L) { return true; } + // If there are invalid segments, force the log + if (numSegmentsPrunedInvalid > 0) { + return true; + } + // If the number of document scanned is larger than 1 million rows, force the log return numDocsScanned > 1_000_000L; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SegmentPrunerServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SegmentPrunerServiceTest.java new file mode 100644 index 0000000000..334cfc1d3c --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SegmentPrunerServiceTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.core.query.pruner; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import org.apache.pinot.core.query.config.SegmentPrunerConfig; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class SegmentPrunerServiceTest { + private final SegmentPrunerConfig _emptyPrunerConf; + + public SegmentPrunerServiceTest() { + PinotConfiguration pinotConf = new PinotConfiguration(); + pinotConf.setProperty(SegmentPrunerConfig.SEGMENT_PRUNER_NAMES_KEY, "[]"); + _emptyPrunerConf = new SegmentPrunerConfig(pinotConf); + } + + @Test + public void notEmptyValidSegmentsAreNotPruned() { + SegmentPrunerService service = new SegmentPrunerService(_emptyPrunerConf); + IndexSegment indexSegment = mockIndexSegment(10, "col1", "col2"); + + SegmentPrunerStatistics stats = new SegmentPrunerStatistics(); + + List<IndexSegment> indexes = new ArrayList<>(); + indexes.add(indexSegment); + + String query = "select col1 from t1"; + + QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query); + + List<IndexSegment> actual = service.prune(indexes, queryContext, stats); + + Assert.assertEquals(actual, indexes); + Assert.assertEquals(stats.getInvalidSegments(), 0); + } + + @Test + public void emptySegmentsAreNotInvalid() { + SegmentPrunerService service = new SegmentPrunerService(_emptyPrunerConf); + IndexSegment indexSegment = mockIndexSegment(0, "col1", "col2"); + + SegmentPrunerStatistics stats = new SegmentPrunerStatistics(); + + List<IndexSegment> indexes = new ArrayList<>(); + indexes.add(indexSegment); + + String query = "select col1 from t1"; + + QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query); + + List<IndexSegment> actual = service.prune(indexes, queryContext, stats); + + Assert.assertEquals(actual, Collections.emptyList()); + Assert.assertEquals(stats.getInvalidSegments(), 0); + } + + @Test + public void segmentsWithoutColumnAreInvalid() { + SegmentPrunerService service = new SegmentPrunerService(_emptyPrunerConf); + IndexSegment indexSegment = mockIndexSegment(10, "col1", "col2"); + + SegmentPrunerStatistics stats = new SegmentPrunerStatistics(); + + List<IndexSegment> indexes = new ArrayList<>(); + indexes.add(indexSegment); + + String query = "select not_present from t1"; + + QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query); + + List<IndexSegment> actual = service.prune(indexes, queryContext, stats); + + Assert.assertEquals(actual, Collections.emptyList()); + Assert.assertEquals(1, stats.getInvalidSegments()); + } + + private IndexSegment mockIndexSegment(int totalDocs, String... columns) { + IndexSegment indexSegment = mock(IndexSegment.class); + when(indexSegment.getColumnNames()).thenReturn(new HashSet<>(Arrays.asList(columns))); + SegmentMetadata segmentMetadata = mock(SegmentMetadata.class); + when(segmentMetadata.getTotalDocs()).thenReturn(totalDocs); + when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata); + return indexSegment; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org