gortiz commented on code in PR #13733: URL: https://github.com/apache/pinot/pull/13733#discussion_r1747205731
########## pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java: ########## @@ -691,4 +662,132 @@ private void addPrunerStats(InstanceResponseBlock instanceResponse, SegmentPrune instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), String.valueOf(prunerStats.getValuePruned())); } + + private List<IndexSegment> selectSegments(List<IndexSegment> indexSegments, QueryContext queryContext, + TimerContext timerContext, ExecutorService executorService, SegmentPrunerStatistics prunerStats) { + List<IndexSegment> selectedSegments; + if ((queryContext.getFilter() != null && queryContext.getFilter().isConstantFalse()) || ( + queryContext.getHavingFilter() != null && queryContext.getHavingFilter().isConstantFalse())) { + selectedSegments = Collections.emptyList(); + } else { + TimerContext.Timer segmentPruneTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING); + selectedSegments = _segmentPrunerService.prune(indexSegments, queryContext, prunerStats, executorService); + segmentPruneTimer.stopAndRecord(); + } + return selectedSegments; + } + + private Plan planCombineQuery(QueryContext queryContext, TimerContext timerContext, ExecutorService executorService, + @Nullable ResultsBlockStreamer streamer, List<SegmentContext> selectedSegmentContexts) { + TimerContext.Timer planBuildTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN); + + Plan queryPlan; + if (streamer != null) { + queryPlan = _planMaker.makeStreamingInstancePlan(selectedSegmentContexts, queryContext, executorService, + streamer, _serverMetrics); + } else { + queryPlan = _planMaker.makeInstancePlan(selectedSegmentContexts, queryContext, executorService, _serverMetrics); + } + planBuildTimer.stopAndRecord(); + return queryPlan; + } + + private InstanceResponseBlock execute(TableDataManager tableDataManager, + List<IndexSegment> indexSegments, QueryContext queryContext, TimerContext timerContext, + ExecutorService executorService, ResultsBlockStreamer streamer, boolean enableStreaming, + List<IndexSegment> selectedSegments, List<SegmentContext> selectedSegmentContexts) + throws TimeoutException { + InstanceResponseBlock instanceResponse; + @Nullable + ResultsBlockStreamer actualStreamer = enableStreaming ? streamer : null; + switch (queryContext.getExplain()) { + case DESCRIPTION: + instanceResponse = executeDescribeExplain(indexSegments, queryContext, timerContext, executorService, + actualStreamer, selectedSegmentContexts); + break; + case NODE: + instanceResponse = executeNodeExplain(queryContext, timerContext, executorService, actualStreamer, + selectedSegmentContexts); + break; + case NONE: + instanceResponse = executeQuery(queryContext, timerContext, executorService, actualStreamer, + selectedSegmentContexts); + break; + default: + throw new IllegalStateException("Unsupported explain mode: " + queryContext.getExplain()); + } + return instanceResponse; + } + + private InstanceResponseBlock executeQuery(QueryContext queryContext, TimerContext timerContext, + ExecutorService executorService, @Nullable ResultsBlockStreamer streamer, + List<SegmentContext> selectedSegmentContexts) + throws TimeoutException { + if (selectedSegmentContexts.isEmpty()) { + return new InstanceResponseBlock(ResultsBlockUtils.buildEmptyQueryResults(queryContext)); + } + Plan queryPlan = planCombineQuery(queryContext, timerContext, executorService, streamer, selectedSegmentContexts); + + TimerContext.Timer planExecTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION); + InstanceResponseBlock instanceResponse = queryPlan.execute(); + planExecTimer.stopAndRecord(); + return instanceResponse; + } + + private InstanceResponseBlock executeNodeExplain(QueryContext queryContext, TimerContext timerContext, + ExecutorService executorService, @Nullable ResultsBlockStreamer streamer, + List<SegmentContext> selectedSegmentContexts) { + + if (selectedSegmentContexts.isEmpty()) { + ExplainInfo emptyNode = new ExplainInfo(ExplainPlanRows.ALL_SEGMENTS_PRUNED_ON_SERVER); + ExplainV2ResultBlock explainResults = new ExplainV2ResultBlock(queryContext, emptyNode); + + return new InstanceResponseBlock(explainResults); + } + + Plan queryPlan = planCombineQuery(queryContext, timerContext, executorService, streamer, + selectedSegmentContexts); + + TimerContext.Timer planExecTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION); + + InstanceResponseBlock instanceResponse; + InstanceResponseOperator responseOperator = (InstanceResponseOperator) queryPlan.getPlanNode().run(); + try { + responseOperator.prefetchAll(); + + ExplainInfo explainInfo = responseOperator.getExplainInfo(); + ExplainV2ResultBlock block = new ExplainV2ResultBlock(queryContext, explainInfo); + + instanceResponse = new InstanceResponseBlock(block); + // TODO: Study if this metadata can/should be added Review Comment: Do not forget about this TODO -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org