Jackie-Jiang commented on code in PR #10006: URL: https://github.com/apache/pinot/pull/10006#discussion_r1067579880
########## pinot-core/src/main/java/org/apache/pinot/core/operator/combine/function/CombineFunction.java: ########## @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine.function; + +import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; + + +public interface CombineFunction<T extends BaseResultsBlock> { Review Comment: Suggest renaming it to `ResultsBlockMerger`. `CombineFunction` seems weird to me because it is not really a function. ########## pinot-core/src/main/java/org/apache/pinot/core/operator/combine/CombineOperator.java: ########## @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine; + +import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; + + +public interface CombineOperator { Review Comment: I don't think we need a separate interface since all these methods are not for external use. We can keep them in `BaseCombineOperator()` with protected access. ########## pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java: ########## @@ -122,166 +119,52 @@ public void runJob() { } else { LOGGER.error("Caught serious error while processing query: " + _queryContext, t); } - onException(t); + onProcessSegmentsException(t); } finally { - onFinish(); - phaser.arriveAndDeregister(); + onProcessSegmentsFinish(); + _phaser.arriveAndDeregister(); Tracing.ThreadAccountantOps.clear(); } _totalWorkerThreadCpuTimeNs.getAndAdd(threadResourceUsageProvider.getThreadTimeNs()); } }); } - - BaseResultsBlock mergedBlock; - try { - mergedBlock = mergeResults(); - } catch (InterruptedException | EarlyTerminationException e) { - Exception killedErrorMsg = Tracing.getThreadAccountant().getErrorStatus(); - throw new QueryCancelledException( - "Cancelled while merging results blocks" - + (killedErrorMsg == null ? StringUtils.EMPTY : " " + killedErrorMsg), e); - } catch (Exception e) { - LOGGER.error("Caught exception while merging results blocks (query: {})", _queryContext, e); - mergedBlock = new ExceptionResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR, e)); - } finally { - // Cancel all ongoing jobs - for (Future future : _futures) { - if (!future.isDone()) { - future.cancel(true); - } - } - // Deregister the main thread and wait for all threads done - phaser.awaitAdvance(phaser.arriveAndDeregister()); - } - /* - * _numTasks are number of async tasks submitted to the _executorService, but it does not mean Pinot server - * use those number of threads to concurrently process segments. Instead, if _executorService thread pool has - * less number of threads than _numTasks, the number of threads that used to concurrently process segments equals - * to the pool size. - * TODO: Get the actual number of query worker threads instead of using the default value. - */ - int numServerThreads = Math.min(_numTasks, ResourceManager.DEFAULT_QUERY_WORKER_THREADS); - CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators, _totalWorkerThreadCpuTimeNs.get(), - numServerThreads); - return mergedBlock; } /** - * Executes query on one or more segments in a worker thread. + * Stop the combine operator process. This will stop all sub-tasks that were spun up to process data segments. */ - protected void processSegments() { - int operatorId; - while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { - Operator operator = _operators.get(operatorId); - T resultsBlock; - try { - if (operator instanceof AcquireReleaseColumnsSegmentOperator) { - ((AcquireReleaseColumnsSegmentOperator) operator).acquire(); - } - resultsBlock = (T) operator.nextBlock(); - } finally { - if (operator instanceof AcquireReleaseColumnsSegmentOperator) { - ((AcquireReleaseColumnsSegmentOperator) operator).release(); - } - } - - if (isQuerySatisfied(resultsBlock)) { - // Query is satisfied, skip processing the remaining segments - _blockingQueue.offer(resultsBlock); - return; - } else { - _blockingQueue.offer(resultsBlock); + protected void stopProcess() { + // Cancel all ongoing jobs + for (Future future : _futures) { + if (!future.isDone()) { + future.cancel(true); } } + // Deregister the main thread and wait for all threads done + _phaser.awaitAdvance(_phaser.arriveAndDeregister()); } - /** - * Invoked when {@link #processSegments()} throws exception/error. - */ - protected void onException(Throwable t) { - _blockingQueue.offer(new ExceptionResultsBlock(t)); - } - - /** - * Invoked when {@link #processSegments()} is finished (called in the finally block). - */ - protected void onFinish() { - } - - /** - * Merges the results from the worker threads into a results block. - */ - protected BaseResultsBlock mergeResults() - throws Exception { - T mergedBlock = null; - int numBlocksMerged = 0; - long endTimeMs = _queryContext.getEndTimeMs(); - while (numBlocksMerged < _numOperators) { - // Timeout has reached, shouldn't continue to process. `_blockingQueue.poll` will continue to return blocks even - // if negative timeout is provided; therefore an extra check is needed - long waitTimeMs = endTimeMs - System.currentTimeMillis(); - if (waitTimeMs <= 0) { - return getTimeoutResultsBlock(numBlocksMerged); - } - BaseResultsBlock blockToMerge = _blockingQueue.poll(waitTimeMs, TimeUnit.MILLISECONDS); - if (blockToMerge == null) { - return getTimeoutResultsBlock(numBlocksMerged); - } - if (blockToMerge.getProcessingExceptions() != null) { - // Caught exception while processing segment, skip merging the remaining results blocks and directly return the - // exception - return blockToMerge; - } - if (mergedBlock == null) { - mergedBlock = convertToMergeableBlock((T) blockToMerge); - } else { - mergeResultsBlocks(mergedBlock, (T) blockToMerge); - } - numBlocksMerged++; - if (isQuerySatisfied(mergedBlock)) { - // Query is satisfied, skip merging the remaining results blocks - return mergedBlock; - } - } - return mergedBlock; - } - - private ExceptionResultsBlock getTimeoutResultsBlock(int numBlocksMerged) { + protected ExceptionResultsBlock getTimeoutResultsBlock(int numBlocksMerged) { LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged, _queryContext); return new ExceptionResultsBlock(QueryException.EXECUTION_TIMEOUT_ERROR, new TimeoutException("Timed out while polling results block")); } - /** - * Can be overridden for early termination. The input results block might not be mergeable. - */ - protected boolean isQuerySatisfied(T resultsBlock) { - return false; + @Override + public List<Operator> getChildOperators() { + return _operators; } /** - * Merges a results block into the main mergeable results block. - * <p>NOTE: {@code blockToMerge} should contain the result for a segment without any exception. The errored segment - * result is already handled. - * - * @param mergedBlock The block that accumulates previous results. It should be modified to add the information of the - * other block. - * @param blockToMerge The new block that needs to be merged into the mergedBlock. + * Start the combine operator. */ - protected abstract void mergeResultsBlocks(T mergedBlock, T blockToMerge); + public abstract void start(); Review Comment: Suggest moving `start()` and `stop()` into `BaseStreamBlockCombine` because they don't apply to single block -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org