klsince commented on code in PR #10660: URL: https://github.com/apache/pinot/pull/10660#discussion_r1175579771
########## pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java: ########## @@ -94,60 +103,159 @@ public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query) } FilterContext filter = Objects.requireNonNull(query.getFilter()); ValueCache cachedValues = new ValueCache(); + Map<String, DataSource> dataSourceCache = new HashMap<>(); int numSegments = segments.size(); List<IndexSegment> selectedSegments = new ArrayList<>(numSegments); - if (query.isEnablePrefetch()) { - FetchContext[] fetchContexts = new FetchContext[numSegments]; - try { - // Prefetch bloom filter for columns within the EQ/IN predicate if exists - for (int i = 0; i < numSegments; i++) { - IndexSegment segment = segments.get(i); - FetchContext fetchContext = _fetchPlanner.planFetchForPruning(segment, query); - if (!fetchContext.isEmpty()) { - segment.prefetch(fetchContext); - fetchContexts[i] = fetchContext; - } + if (!query.isEnablePrefetch()) { + for (IndexSegment segment : segments) { + dataSourceCache.clear(); + if (!pruneSegment(segment, filter, dataSourceCache, cachedValues)) { + selectedSegments.add(segment); + } + } + return selectedSegments; + } + FetchContext[] fetchContexts = new FetchContext[numSegments]; + try { + // Prefetch bloom filter for columns within the EQ/IN predicate if exists + for (int i = 0; i < numSegments; i++) { + IndexSegment segment = segments.get(i); + FetchContext fetchContext = _fetchPlanner.planFetchForPruning(segment, query); + if (!fetchContext.isEmpty()) { + segment.prefetch(fetchContext); + fetchContexts[i] = fetchContext; + } + } + // Prune segments + for (int i = 0; i < numSegments; i++) { + dataSourceCache.clear(); + IndexSegment segment = segments.get(i); + if (!pruneSegmentWithFetchContext(segment, fetchContexts[i], filter, dataSourceCache, cachedValues)) { + selectedSegments.add(segment); } - // Prune segments - Map[] dataSourceCaches = new Map[numSegments]; - for (int i = 0; i < numSegments; i++) { - dataSourceCaches[i] = new HashMap<>(); - IndexSegment segment = segments.get(i); - FetchContext fetchContext = fetchContexts[i]; - if (fetchContext != null) { - segment.acquire(fetchContext); - try { - if (!pruneSegment(segment, filter, dataSourceCaches[i], cachedValues)) { + } + return selectedSegments; + } finally { + // Release the prefetched bloom filters + for (int i = 0; i < numSegments; i++) { + FetchContext fetchContext = fetchContexts[i]; + if (fetchContext != null) { + segments.get(i).release(fetchContext); + } + } + } + } + + @Override + public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query, + @Nullable ExecutorService executorService) { + if (segments.isEmpty()) { + return segments; + } + if (executorService == null) { + return prune(segments, query); + } + // With executor service, the pruning is done in parallel. + if (!query.isEnablePrefetch()) { + return parallelPrune(segments, query, executorService, null); + } + int numSegments = segments.size(); + FetchContext[] fetchContexts = new FetchContext[numSegments]; + try { + // Prefetch bloom filter for columns within the EQ/IN predicate if exists + for (int i = 0; i < numSegments; i++) { + IndexSegment segment = segments.get(i); + FetchContext fetchContext = _fetchPlanner.planFetchForPruning(segment, query); + if (!fetchContext.isEmpty()) { + segment.prefetch(fetchContext); + fetchContexts[i] = fetchContext; + } + } + return parallelPrune(segments, query, executorService, fetchContexts); + } finally { + // Release the prefetched bloom filters + for (int i = 0; i < numSegments; i++) { + FetchContext fetchContext = fetchContexts[i]; + if (fetchContext != null) { + segments.get(i).release(fetchContext); + } + } + } + } + + private List<IndexSegment> parallelPrune(List<IndexSegment> segments, QueryContext queryContext, + ExecutorService executorService, FetchContext[] fetchContexts) { + int numSegments = segments.size(); + int numTasks = CombineOperatorUtils.getNumTasksForQuery(numSegments, queryContext.getMaxExecutionThreads()); Review Comment: yup I'll try to extract out a util method ########## pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java: ########## @@ -94,60 +103,159 @@ public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query) } FilterContext filter = Objects.requireNonNull(query.getFilter()); ValueCache cachedValues = new ValueCache(); + Map<String, DataSource> dataSourceCache = new HashMap<>(); int numSegments = segments.size(); List<IndexSegment> selectedSegments = new ArrayList<>(numSegments); - if (query.isEnablePrefetch()) { - FetchContext[] fetchContexts = new FetchContext[numSegments]; - try { - // Prefetch bloom filter for columns within the EQ/IN predicate if exists - for (int i = 0; i < numSegments; i++) { - IndexSegment segment = segments.get(i); - FetchContext fetchContext = _fetchPlanner.planFetchForPruning(segment, query); - if (!fetchContext.isEmpty()) { - segment.prefetch(fetchContext); - fetchContexts[i] = fetchContext; - } + if (!query.isEnablePrefetch()) { Review Comment: make sense! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org