Jackie-Jiang commented on code in PR #10660:
URL: https://github.com/apache/pinot/pull/10660#discussion_r1174053206


##########
pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java:
##########
@@ -94,60 +103,159 @@ public List<IndexSegment> prune(List<IndexSegment> 
segments, QueryContext query)
     }
     FilterContext filter = Objects.requireNonNull(query.getFilter());
     ValueCache cachedValues = new ValueCache();
+    Map<String, DataSource> dataSourceCache = new HashMap<>();
     int numSegments = segments.size();
     List<IndexSegment> selectedSegments = new ArrayList<>(numSegments);
-    if (query.isEnablePrefetch()) {
-      FetchContext[] fetchContexts = new FetchContext[numSegments];
-      try {
-        // Prefetch bloom filter for columns within the EQ/IN predicate if 
exists
-        for (int i = 0; i < numSegments; i++) {
-          IndexSegment segment = segments.get(i);
-          FetchContext fetchContext = 
_fetchPlanner.planFetchForPruning(segment, query);
-          if (!fetchContext.isEmpty()) {
-            segment.prefetch(fetchContext);
-            fetchContexts[i] = fetchContext;
-          }
+    if (!query.isEnablePrefetch()) {
+      for (IndexSegment segment : segments) {
+        dataSourceCache.clear();
+        if (!pruneSegment(segment, filter, dataSourceCache, cachedValues)) {
+          selectedSegments.add(segment);
+        }
+      }
+      return selectedSegments;
+    }
+    FetchContext[] fetchContexts = new FetchContext[numSegments];
+    try {
+      // Prefetch bloom filter for columns within the EQ/IN predicate if exists
+      for (int i = 0; i < numSegments; i++) {
+        IndexSegment segment = segments.get(i);
+        FetchContext fetchContext = _fetchPlanner.planFetchForPruning(segment, 
query);
+        if (!fetchContext.isEmpty()) {
+          segment.prefetch(fetchContext);
+          fetchContexts[i] = fetchContext;
+        }
+      }
+      // Prune segments
+      for (int i = 0; i < numSegments; i++) {
+        dataSourceCache.clear();
+        IndexSegment segment = segments.get(i);
+        if (!pruneSegmentWithFetchContext(segment, fetchContexts[i], filter, 
dataSourceCache, cachedValues)) {
+          selectedSegments.add(segment);
         }
-        // Prune segments
-        Map[] dataSourceCaches = new Map[numSegments];
-        for (int i = 0; i < numSegments; i++) {
-          dataSourceCaches[i] = new HashMap<>();
-          IndexSegment segment = segments.get(i);
-          FetchContext fetchContext = fetchContexts[i];
-          if (fetchContext != null) {
-            segment.acquire(fetchContext);
-            try {
-              if (!pruneSegment(segment, filter, dataSourceCaches[i], 
cachedValues)) {
+      }
+      return selectedSegments;
+    } finally {
+      // Release the prefetched bloom filters
+      for (int i = 0; i < numSegments; i++) {
+        FetchContext fetchContext = fetchContexts[i];
+        if (fetchContext != null) {
+          segments.get(i).release(fetchContext);
+        }
+      }
+    }
+  }
+
+  @Override
+  public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext 
query,
+      @Nullable ExecutorService executorService) {
+    if (segments.isEmpty()) {
+      return segments;
+    }
+    if (executorService == null) {
+      return prune(segments, query);
+    }
+    // With executor service, the pruning is done in parallel.
+    if (!query.isEnablePrefetch()) {
+      return parallelPrune(segments, query, executorService, null);
+    }
+    int numSegments = segments.size();
+    FetchContext[] fetchContexts = new FetchContext[numSegments];
+    try {
+      // Prefetch bloom filter for columns within the EQ/IN predicate if exists
+      for (int i = 0; i < numSegments; i++) {
+        IndexSegment segment = segments.get(i);
+        FetchContext fetchContext = _fetchPlanner.planFetchForPruning(segment, 
query);
+        if (!fetchContext.isEmpty()) {
+          segment.prefetch(fetchContext);
+          fetchContexts[i] = fetchContext;
+        }
+      }
+      return parallelPrune(segments, query, executorService, fetchContexts);
+    } finally {
+      // Release the prefetched bloom filters
+      for (int i = 0; i < numSegments; i++) {
+        FetchContext fetchContext = fetchContexts[i];
+        if (fetchContext != null) {
+          segments.get(i).release(fetchContext);
+        }
+      }
+    }
+  }
+
+  private List<IndexSegment> parallelPrune(List<IndexSegment> segments, 
QueryContext queryContext,
+      ExecutorService executorService, FetchContext[] fetchContexts) {
+    int numSegments = segments.size();
+    int numTasks = CombineOperatorUtils.getNumTasksForQuery(numSegments, 
queryContext.getMaxExecutionThreads());

Review Comment:
   Suggest using the same way as `CombinePlanNode` to determine the `numTasks`. 
We can extract the common logic into a util



##########
pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java:
##########
@@ -94,60 +103,159 @@ public List<IndexSegment> prune(List<IndexSegment> 
segments, QueryContext query)
     }
     FilterContext filter = Objects.requireNonNull(query.getFilter());
     ValueCache cachedValues = new ValueCache();
+    Map<String, DataSource> dataSourceCache = new HashMap<>();
     int numSegments = segments.size();
     List<IndexSegment> selectedSegments = new ArrayList<>(numSegments);
-    if (query.isEnablePrefetch()) {
-      FetchContext[] fetchContexts = new FetchContext[numSegments];
-      try {
-        // Prefetch bloom filter for columns within the EQ/IN predicate if 
exists
-        for (int i = 0; i < numSegments; i++) {
-          IndexSegment segment = segments.get(i);
-          FetchContext fetchContext = 
_fetchPlanner.planFetchForPruning(segment, query);
-          if (!fetchContext.isEmpty()) {
-            segment.prefetch(fetchContext);
-            fetchContexts[i] = fetchContext;
-          }
+    if (!query.isEnablePrefetch()) {

Review Comment:
   Suggest making a separate pruner for bloom filter. Bloom filter can be 
applied only if there is EQ or IN with less than `_inPredicateThreshold` 
values. We should probably always use parallel pruning for bloom filter since 
it involves loading index.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to