Jackie-Jiang commented on code in PR #10660: URL: https://github.com/apache/pinot/pull/10660#discussion_r1174053206
########## pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java: ########## @@ -94,60 +103,159 @@ public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query) } FilterContext filter = Objects.requireNonNull(query.getFilter()); ValueCache cachedValues = new ValueCache(); + Map<String, DataSource> dataSourceCache = new HashMap<>(); int numSegments = segments.size(); List<IndexSegment> selectedSegments = new ArrayList<>(numSegments); - if (query.isEnablePrefetch()) { - FetchContext[] fetchContexts = new FetchContext[numSegments]; - try { - // Prefetch bloom filter for columns within the EQ/IN predicate if exists - for (int i = 0; i < numSegments; i++) { - IndexSegment segment = segments.get(i); - FetchContext fetchContext = _fetchPlanner.planFetchForPruning(segment, query); - if (!fetchContext.isEmpty()) { - segment.prefetch(fetchContext); - fetchContexts[i] = fetchContext; - } + if (!query.isEnablePrefetch()) { + for (IndexSegment segment : segments) { + dataSourceCache.clear(); + if (!pruneSegment(segment, filter, dataSourceCache, cachedValues)) { + selectedSegments.add(segment); + } + } + return selectedSegments; + } + FetchContext[] fetchContexts = new FetchContext[numSegments]; + try { + // Prefetch bloom filter for columns within the EQ/IN predicate if exists + for (int i = 0; i < numSegments; i++) { + IndexSegment segment = segments.get(i); + FetchContext fetchContext = _fetchPlanner.planFetchForPruning(segment, query); + if (!fetchContext.isEmpty()) { + segment.prefetch(fetchContext); + fetchContexts[i] = fetchContext; + } + } + // Prune segments + for (int i = 0; i < numSegments; i++) { + dataSourceCache.clear(); + IndexSegment segment = segments.get(i); + if (!pruneSegmentWithFetchContext(segment, fetchContexts[i], filter, dataSourceCache, cachedValues)) { + selectedSegments.add(segment); } - // Prune segments - Map[] dataSourceCaches = new Map[numSegments]; - for (int i = 0; i < numSegments; i++) { - dataSourceCaches[i] = new HashMap<>(); - IndexSegment segment = segments.get(i); - FetchContext fetchContext = fetchContexts[i]; - if (fetchContext != null) { - segment.acquire(fetchContext); - try { - if (!pruneSegment(segment, filter, dataSourceCaches[i], cachedValues)) { + } + return selectedSegments; + } finally { + // Release the prefetched bloom filters + for (int i = 0; i < numSegments; i++) { + FetchContext fetchContext = fetchContexts[i]; + if (fetchContext != null) { + segments.get(i).release(fetchContext); + } + } + } + } + + @Override + public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query, + @Nullable ExecutorService executorService) { + if (segments.isEmpty()) { + return segments; + } + if (executorService == null) { + return prune(segments, query); + } + // With executor service, the pruning is done in parallel. + if (!query.isEnablePrefetch()) { + return parallelPrune(segments, query, executorService, null); + } + int numSegments = segments.size(); + FetchContext[] fetchContexts = new FetchContext[numSegments]; + try { + // Prefetch bloom filter for columns within the EQ/IN predicate if exists + for (int i = 0; i < numSegments; i++) { + IndexSegment segment = segments.get(i); + FetchContext fetchContext = _fetchPlanner.planFetchForPruning(segment, query); + if (!fetchContext.isEmpty()) { + segment.prefetch(fetchContext); + fetchContexts[i] = fetchContext; + } + } + return parallelPrune(segments, query, executorService, fetchContexts); + } finally { + // Release the prefetched bloom filters + for (int i = 0; i < numSegments; i++) { + FetchContext fetchContext = fetchContexts[i]; + if (fetchContext != null) { + segments.get(i).release(fetchContext); + } + } + } + } + + private List<IndexSegment> parallelPrune(List<IndexSegment> segments, QueryContext queryContext, + ExecutorService executorService, FetchContext[] fetchContexts) { + int numSegments = segments.size(); + int numTasks = CombineOperatorUtils.getNumTasksForQuery(numSegments, queryContext.getMaxExecutionThreads()); Review Comment: Suggest using the same way as `CombinePlanNode` to determine the `numTasks`. We can extract the common logic into a util ########## pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java: ########## @@ -94,60 +103,159 @@ public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query) } FilterContext filter = Objects.requireNonNull(query.getFilter()); ValueCache cachedValues = new ValueCache(); + Map<String, DataSource> dataSourceCache = new HashMap<>(); int numSegments = segments.size(); List<IndexSegment> selectedSegments = new ArrayList<>(numSegments); - if (query.isEnablePrefetch()) { - FetchContext[] fetchContexts = new FetchContext[numSegments]; - try { - // Prefetch bloom filter for columns within the EQ/IN predicate if exists - for (int i = 0; i < numSegments; i++) { - IndexSegment segment = segments.get(i); - FetchContext fetchContext = _fetchPlanner.planFetchForPruning(segment, query); - if (!fetchContext.isEmpty()) { - segment.prefetch(fetchContext); - fetchContexts[i] = fetchContext; - } + if (!query.isEnablePrefetch()) { Review Comment: Suggest making a separate pruner for bloom filter. Bloom filter can be applied only if there is EQ or IN with less than `_inPredicateThreshold` values. We should probably always use parallel pruning for bloom filter since it involves loading index. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org