klsince commented on code in PR #10660:
URL: https://github.com/apache/pinot/pull/10660#discussion_r1175579771


##########
pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java:
##########
@@ -94,60 +103,159 @@ public List<IndexSegment> prune(List<IndexSegment> 
segments, QueryContext query)
     }
     FilterContext filter = Objects.requireNonNull(query.getFilter());
     ValueCache cachedValues = new ValueCache();
+    Map<String, DataSource> dataSourceCache = new HashMap<>();
     int numSegments = segments.size();
     List<IndexSegment> selectedSegments = new ArrayList<>(numSegments);
-    if (query.isEnablePrefetch()) {
-      FetchContext[] fetchContexts = new FetchContext[numSegments];
-      try {
-        // Prefetch bloom filter for columns within the EQ/IN predicate if 
exists
-        for (int i = 0; i < numSegments; i++) {
-          IndexSegment segment = segments.get(i);
-          FetchContext fetchContext = 
_fetchPlanner.planFetchForPruning(segment, query);
-          if (!fetchContext.isEmpty()) {
-            segment.prefetch(fetchContext);
-            fetchContexts[i] = fetchContext;
-          }
+    if (!query.isEnablePrefetch()) {
+      for (IndexSegment segment : segments) {
+        dataSourceCache.clear();
+        if (!pruneSegment(segment, filter, dataSourceCache, cachedValues)) {
+          selectedSegments.add(segment);
+        }
+      }
+      return selectedSegments;
+    }
+    FetchContext[] fetchContexts = new FetchContext[numSegments];
+    try {
+      // Prefetch bloom filter for columns within the EQ/IN predicate if exists
+      for (int i = 0; i < numSegments; i++) {
+        IndexSegment segment = segments.get(i);
+        FetchContext fetchContext = _fetchPlanner.planFetchForPruning(segment, 
query);
+        if (!fetchContext.isEmpty()) {
+          segment.prefetch(fetchContext);
+          fetchContexts[i] = fetchContext;
+        }
+      }
+      // Prune segments
+      for (int i = 0; i < numSegments; i++) {
+        dataSourceCache.clear();
+        IndexSegment segment = segments.get(i);
+        if (!pruneSegmentWithFetchContext(segment, fetchContexts[i], filter, 
dataSourceCache, cachedValues)) {
+          selectedSegments.add(segment);
         }
-        // Prune segments
-        Map[] dataSourceCaches = new Map[numSegments];
-        for (int i = 0; i < numSegments; i++) {
-          dataSourceCaches[i] = new HashMap<>();
-          IndexSegment segment = segments.get(i);
-          FetchContext fetchContext = fetchContexts[i];
-          if (fetchContext != null) {
-            segment.acquire(fetchContext);
-            try {
-              if (!pruneSegment(segment, filter, dataSourceCaches[i], 
cachedValues)) {
+      }
+      return selectedSegments;
+    } finally {
+      // Release the prefetched bloom filters
+      for (int i = 0; i < numSegments; i++) {
+        FetchContext fetchContext = fetchContexts[i];
+        if (fetchContext != null) {
+          segments.get(i).release(fetchContext);
+        }
+      }
+    }
+  }
+
+  @Override
+  public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext 
query,
+      @Nullable ExecutorService executorService) {
+    if (segments.isEmpty()) {
+      return segments;
+    }
+    if (executorService == null) {
+      return prune(segments, query);
+    }
+    // With executor service, the pruning is done in parallel.
+    if (!query.isEnablePrefetch()) {
+      return parallelPrune(segments, query, executorService, null);
+    }
+    int numSegments = segments.size();
+    FetchContext[] fetchContexts = new FetchContext[numSegments];
+    try {
+      // Prefetch bloom filter for columns within the EQ/IN predicate if exists
+      for (int i = 0; i < numSegments; i++) {
+        IndexSegment segment = segments.get(i);
+        FetchContext fetchContext = _fetchPlanner.planFetchForPruning(segment, 
query);
+        if (!fetchContext.isEmpty()) {
+          segment.prefetch(fetchContext);
+          fetchContexts[i] = fetchContext;
+        }
+      }
+      return parallelPrune(segments, query, executorService, fetchContexts);
+    } finally {
+      // Release the prefetched bloom filters
+      for (int i = 0; i < numSegments; i++) {
+        FetchContext fetchContext = fetchContexts[i];
+        if (fetchContext != null) {
+          segments.get(i).release(fetchContext);
+        }
+      }
+    }
+  }
+
+  private List<IndexSegment> parallelPrune(List<IndexSegment> segments, 
QueryContext queryContext,
+      ExecutorService executorService, FetchContext[] fetchContexts) {
+    int numSegments = segments.size();
+    int numTasks = CombineOperatorUtils.getNumTasksForQuery(numSegments, 
queryContext.getMaxExecutionThreads());

Review Comment:
   yup I'll try to extract out a util method



##########
pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java:
##########
@@ -94,60 +103,159 @@ public List<IndexSegment> prune(List<IndexSegment> 
segments, QueryContext query)
     }
     FilterContext filter = Objects.requireNonNull(query.getFilter());
     ValueCache cachedValues = new ValueCache();
+    Map<String, DataSource> dataSourceCache = new HashMap<>();
     int numSegments = segments.size();
     List<IndexSegment> selectedSegments = new ArrayList<>(numSegments);
-    if (query.isEnablePrefetch()) {
-      FetchContext[] fetchContexts = new FetchContext[numSegments];
-      try {
-        // Prefetch bloom filter for columns within the EQ/IN predicate if 
exists
-        for (int i = 0; i < numSegments; i++) {
-          IndexSegment segment = segments.get(i);
-          FetchContext fetchContext = 
_fetchPlanner.planFetchForPruning(segment, query);
-          if (!fetchContext.isEmpty()) {
-            segment.prefetch(fetchContext);
-            fetchContexts[i] = fetchContext;
-          }
+    if (!query.isEnablePrefetch()) {

Review Comment:
   make sense!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to