This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7ab0af00a1 [multistage][cleanup] make leaf stage also registered with SchedulerService (#10711) 7ab0af00a1 is described below commit 7ab0af00a1179b011a8ab9211f28eeb318ea336c Author: Rong Rong <ro...@apache.org> AuthorDate: Mon May 8 21:23:27 2023 -0700 [multistage][cleanup] make leaf stage also registered with SchedulerService (#10711) * Use cached threadpool to avoid sending starvation * Separate leaf stage to a different scheduler --------- Co-authored-by: Rong Rong <ro...@startree.ai> --- .../apache/pinot/query/runtime/QueryRunner.java | 117 +++++++-------------- .../runtime/executor/LeafSchedulerService.java | 102 ++++++++++++++++++ .../LeafStageTransferableBlockOperator.java | 58 ++++++---- .../apache/pinot/query/service/QueryServer.java | 6 +- .../apache/pinot/query/QueryServerEnclosure.java | 3 +- ...Test.java => YieldingSchedulerServiceTest.java} | 2 +- .../LeafStageTransferableBlockOperatorTest.java | 82 ++++++++++++--- .../pinot/query/runtime/operator/OpChainTest.java | 7 +- 8 files changed, 254 insertions(+), 123 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index ced674d269..8876a950e3 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime; import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -35,7 +36,6 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.NamedThreadFactory; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; -import org.apache.pinot.core.operator.combine.BaseCombineOperator; import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.scheduler.resources.ResourceManager; @@ -45,7 +45,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.routing.PlanFragmentMetadata; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.routing.WorkerMetadata; -import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.query.runtime.executor.LeafSchedulerService; import org.apache.pinot.query.runtime.executor.OpChainSchedulerService; import org.apache.pinot.query.runtime.executor.RoundRobinScheduler; import org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator; @@ -85,34 +85,13 @@ public class QueryRunner { private String _hostname; private int _port; private VirtualServerAddress _rootServer; - // Query worker threads are used for (1) running intermediate stage operators (2) running segment level operators - /** - * Query worker threads are used for: - * <ol> - * <li> - * Running intermediate stage operators (v2 engine operators). - * </li> - * <li> - * Running per-segment operators submitted in {@link BaseCombineOperator}. - * </li> - * </ol> - */ + private ExecutorService _queryWorkerIntermExecutorService; private ExecutorService _queryWorkerLeafExecutorService; - /** - * Query runner threads are used for: - * <ol> - * <li> - * Merging results in BaseCombineOperator for leaf stages. Results are provided by per-segment operators run in - * worker threads - * </li> - * <li> - * Building the OperatorChain and submitting to the scheduler for non-leaf stages (intermediate stages). - * </li> - * </ol> - */ private ExecutorService _queryRunnerExecutorService; - private OpChainSchedulerService _scheduler; + + private OpChainSchedulerService _intermScheduler; + private LeafSchedulerService _leafScheduler; /** * Initializes the query executor. @@ -130,15 +109,16 @@ public class QueryRunner { try { long releaseMs = config.getProperty(QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS, QueryConfig.DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS); - _queryWorkerIntermExecutorService = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS, + _queryWorkerIntermExecutorService = Executors.newCachedThreadPool( new NamedThreadFactory("query_intermediate_worker_on_" + _port + "_port")); _queryWorkerLeafExecutorService = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new NamedThreadFactory("query_leaf_worker_on_" + _port + "_port")); - _queryRunnerExecutorService = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_RUNNER_THREADS, + _queryRunnerExecutorService = Executors.newCachedThreadPool( new NamedThreadFactory("query_runner_on_" + _port + "_port")); - _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), + _intermScheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), getQueryWorkerIntermExecutorService()); - _mailboxService = new MailboxService(_hostname, _port, config, _scheduler::onDataAvailable); + _leafScheduler = new LeafSchedulerService(getQueryRunnerExecutorService()); + _mailboxService = new MailboxService(_hostname, _port, config, _intermScheduler::onDataAvailable); _serverExecutor = new ServerQueryExecutorV1Impl(); _serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics); } catch (Exception e) { @@ -151,14 +131,14 @@ public class QueryRunner { _helixPropertyStore = _helixManager.getHelixPropertyStore(); _mailboxService.start(); _serverExecutor.start(); - _scheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS); + _intermScheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS); } public void shutDown() throws TimeoutException { _serverExecutor.shutDown(); _mailboxService.shutdown(); - _scheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS); + _intermScheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS); } public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) { @@ -168,18 +148,20 @@ public class QueryRunner { Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false")); long deadlineMs = System.currentTimeMillis() + timeoutMs; if (isLeafStage(distributedStagePlan)) { - runLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, deadlineMs, requestId); + OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, deadlineMs, + requestId); + _leafScheduler.register(rootOperator); } else { PlanNode stageRoot = distributedStagePlan.getStageRoot(); OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot, new PlanRequestContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs, distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), isTraceEnabled)); - _scheduler.register(rootOperator); + _intermScheduler.register(rootOperator); } } public void cancel(long requestId) { - _scheduler.cancel(requestId); + _intermScheduler.cancel(requestId); } @VisibleForTesting @@ -196,7 +178,7 @@ public class QueryRunner { return _queryRunnerExecutorService; } - private void runLeafStage(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap, + private OpChain compileLeafStage(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap, long timeoutMs, long deadlineMs, long requestId) { boolean isTraceEnabled = Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false")); @@ -208,9 +190,19 @@ public class QueryRunner { serverQueryRequests.add(new ServerQueryRequest(requestContext.getInstanceRequest(), new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis())); } - getQueryRunnerExecutorService().submit(() -> { - processServerQuery(requestId, timeoutMs, deadlineMs, isTraceEnabled, distributedStagePlan, serverQueryRequests); - }); + MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot(); + OpChainExecutionContext opChainExecutionContext = + new OpChainExecutionContext(_mailboxService, requestId, sendNode.getPlanFragmentId(), + distributedStagePlan.getServer(), timeoutMs, deadlineMs, distributedStagePlan.getStageMetadata(), + isTraceEnabled); + MultiStageOperator leafStageOperator = + new LeafStageTransferableBlockOperator(opChainExecutionContext, this::processServerQueryRequest, + serverQueryRequests, sendNode.getDataSchema()); + MailboxSendOperator mailboxSendOperator = + new MailboxSendOperator(opChainExecutionContext, leafStageOperator, sendNode.getExchangeType(), + sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(), sendNode.getCollationDirections(), + sendNode.isSortOnSender(), sendNode.getReceiverStageId()); + return new OpChain(opChainExecutionContext, mailboxSendOperator, Collections.emptyList()); } private static List<ServerPlanRequestContext> constructServerQueryRequests(DistributedStagePlan distributedStagePlan, @@ -249,46 +241,17 @@ public class QueryRunner { return requests; } - private void processServerQuery(long requestId, long timeoutMs, long deadlineMs, boolean isTraceEnabled, - DistributedStagePlan distributedStagePlan, List<ServerQueryRequest> serverQueryRequests) { - MailboxSendOperator mailboxSendOperator = null; + private InstanceResponseBlock processServerQueryRequest(ServerQueryRequest request) { + InstanceResponseBlock result; try { - // send the data table via mailbox in one-off fashion (e.g. no block-level split, one data table/partition key) - List<InstanceResponseBlock> serverQueryResults = new ArrayList<>(serverQueryRequests.size()); - for (ServerQueryRequest request : serverQueryRequests) { - InstanceResponseBlock result; - try { - result = _serverExecutor.execute(request, getQueryWorkerLeafExecutorService()); - } catch (Exception e) { - InstanceResponseBlock errorResponse = new InstanceResponseBlock(); - errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE, - e.getMessage() + QueryException.getTruncatedStackTrace(e)); - result = errorResponse; - } - serverQueryResults.add(result); - } - MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot(); - OpChainExecutionContext opChainExecutionContext = - new OpChainExecutionContext(_mailboxService, requestId, sendNode.getPlanFragmentId(), - distributedStagePlan.getServer(), timeoutMs, deadlineMs, distributedStagePlan.getStageMetadata(), - isTraceEnabled); - MultiStageOperator leafStageOperator = - new LeafStageTransferableBlockOperator(opChainExecutionContext, serverQueryResults, sendNode.getDataSchema()); - mailboxSendOperator = - new MailboxSendOperator(opChainExecutionContext, leafStageOperator, sendNode.getExchangeType(), - sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(), sendNode.getCollationDirections(), - sendNode.isSortOnSender(), sendNode.getReceiverStageId()); - int blockCounter = 0; - while (!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) { - LOGGER.debug("Acquired transferable block: {}", blockCounter++); - } - mailboxSendOperator.close(); + result = _serverExecutor.execute(request, getQueryWorkerLeafExecutorService()); } catch (Exception e) { - LOGGER.error(String.format("Error running leafStage for requestId=%s", requestId), e); - if (mailboxSendOperator != null) { - mailboxSendOperator.cancel(e); - } + InstanceResponseBlock errorResponse = new InstanceResponseBlock(); + errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE, + e.getMessage() + QueryException.getTruncatedStackTrace(e)); + result = errorResponse; } + return result; } private boolean isLeafStage(DistributedStagePlan distributedStagePlan) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/LeafSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/LeafSchedulerService.java new file mode 100644 index 0000000000..faa4bf4945 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/LeafSchedulerService.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.executor; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import org.apache.pinot.core.util.trace.TraceRunnable; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.operator.OpChain; +import org.apache.pinot.query.runtime.operator.OpChainId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class LeafSchedulerService { + private static final Logger LOGGER = LoggerFactory.getLogger(LeafSchedulerService.class); + + private final ExecutorService _executorService; + private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap; + + public LeafSchedulerService(ExecutorService executorService) { + _executorService = executorService; + _submittedOpChainMap = new ConcurrentHashMap<>(); + } + + public void register(OpChain operatorChain) { + Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() { + @Override + public void runJob() { + boolean isFinished = false; + boolean returnedErrorBlock = false; + Throwable thrown = null; + try { + LOGGER.trace("({}): Executing", operatorChain); + operatorChain.getStats().executing(); + TransferableBlock result = operatorChain.getRoot().nextBlock(); + while (!result.isEndOfStreamBlock()) { + result = operatorChain.getRoot().nextBlock(); + } + isFinished = true; + if (result.isErrorBlock()) { + returnedErrorBlock = true; + LOGGER.error("({}): Completed erroneously {} {}", operatorChain, operatorChain.getStats(), + result.getDataBlock().getExceptions()); + } else { + LOGGER.debug("({}): Completed {}", operatorChain, operatorChain.getStats()); + } + } catch (Exception e) { + LOGGER.error("({}): Failed to execute operator chain! {}", operatorChain, operatorChain.getStats(), e); + thrown = e; + } finally { + if (returnedErrorBlock || thrown != null) { + cancelOpChain(operatorChain, thrown); + } else if (isFinished) { + closeOpChain(operatorChain); + } + } + } + }); + _submittedOpChainMap.put(operatorChain.getId(), scheduledFuture); + } + + public void cancel(long requestId) { + // simple cancellation. for leaf stage this cannot be a dangling opchain b/c they will eventually be cleared up + // via query timeout. + List<OpChainId> opChainIdsToCancel = _submittedOpChainMap.keySet() + .stream().filter(opChainId -> opChainId.getRequestId() == requestId).collect(Collectors.toList()); + for (OpChainId opChainId : opChainIdsToCancel) { + Future<?> future = _submittedOpChainMap.get(opChainId); + if (future != null) { + future.cancel(true); + } + } + } + + private void closeOpChain(OpChain opChain) { + opChain.close(); + } + + private void cancelOpChain(OpChain opChain, Throwable t) { + opChain.cancel(t); + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java index 5656baebe8..64e9117a92 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java @@ -24,8 +24,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.PriorityQueue; +import java.util.function.Function; import javax.annotation.Nullable; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.datablock.DataBlockUtils; @@ -38,6 +40,7 @@ import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock; import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; +import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.selection.SelectionOperatorUtils; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; @@ -62,22 +65,19 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator { private static final String EXPLAIN_NAME = "LEAF_STAGE_TRANSFER_OPERATOR"; private static final Logger LOGGER = LoggerFactory.getLogger(LeafStageTransferableBlockOperator.class); - private final InstanceResponseBlock _errorBlock; - private final List<InstanceResponseBlock> _baseResultBlock; + private final LinkedList<ServerQueryRequest> _serverQueryRequestQueue; private final DataSchema _desiredDataSchema; - private int _currentIndex; + private final Function<ServerQueryRequest, InstanceResponseBlock> _processCall; + + private InstanceResponseBlock _errorBlock; public LeafStageTransferableBlockOperator(OpChainExecutionContext context, - List<InstanceResponseBlock> baseResultBlock, DataSchema dataSchema) { + Function<ServerQueryRequest, InstanceResponseBlock> processCall, + List<ServerQueryRequest> serverQueryRequestList, DataSchema dataSchema) { super(context); - _baseResultBlock = baseResultBlock; + _processCall = processCall; + _serverQueryRequestQueue = new LinkedList<>(serverQueryRequestList); _desiredDataSchema = dataSchema; - _errorBlock = baseResultBlock.stream().filter(e -> !e.getExceptions().isEmpty()).findFirst().orElse(null); - _currentIndex = 0; - for (InstanceResponseBlock instanceResponseBlock : baseResultBlock) { - OperatorStats operatorStats = _opChainStats.getOperatorStats(context, getOperatorId()); - operatorStats.recordExecutionStats(instanceResponseBlock.getResponseMetadata()); - } } @Override @@ -93,27 +93,39 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator { @Override protected TransferableBlock getNextBlock() { - if (_currentIndex < 0) { + if (_errorBlock != null) { throw new RuntimeException("Leaf transfer terminated. next block should no longer be called."); } - if (_errorBlock != null) { - _currentIndex = -1; + // runLeafStage + InstanceResponseBlock responseBlock = getNextBlockFromLeafStage(); + if (responseBlock == null) { + // finished getting next block from leaf stage. returning EOS + return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock()); + } else if (!responseBlock.getExceptions().isEmpty()) { + // get error from leaf stage, return ERROR + _errorBlock = responseBlock; return new TransferableBlock(DataBlockUtils.getErrorDataBlock(_errorBlock.getExceptions())); } else { - if (_currentIndex < _baseResultBlock.size()) { - InstanceResponseBlock responseBlock = _baseResultBlock.get(_currentIndex++); - if (responseBlock.getResultsBlock() != null && responseBlock.getResultsBlock().getNumRows() > 0) { - return composeTransferableBlock(responseBlock, _desiredDataSchema); - } else { - return new TransferableBlock(Collections.emptyList(), _desiredDataSchema, DataBlock.Type.ROW); - } + // return normal block. + OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, getOperatorId()); + operatorStats.recordExecutionStats(responseBlock.getResponseMetadata()); + if (responseBlock.getResultsBlock() != null && responseBlock.getResultsBlock().getNumRows() > 0) { + return composeTransferableBlock(responseBlock, _desiredDataSchema); } else { - _currentIndex = -1; - return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock()); + return new TransferableBlock(Collections.emptyList(), _desiredDataSchema, DataBlock.Type.ROW); } } } + private @Nullable InstanceResponseBlock getNextBlockFromLeafStage() { + if (!_serverQueryRequestQueue.isEmpty()) { + ServerQueryRequest request = _serverQueryRequestQueue.pop(); + return _processCall.apply(request); + } else { + return null; + } + } + /** * Leaf stage operators should always collect stats for the tables used in queries * Otherwise the Broker response will just contain zeros for every stat value diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java index 8e7061bff8..4cd92ab32e 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java @@ -22,9 +22,7 @@ import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.Status; import io.grpc.stub.StreamObserver; -import java.io.IOException; import java.util.Map; -import java.util.concurrent.TimeoutException; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.proto.PinotQueryWorkerGrpc; import org.apache.pinot.common.proto.Worker; @@ -64,7 +62,7 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase { } _queryRunner.start(); _server.start(); - } catch (IOException | TimeoutException e) { + } catch (Exception e) { throw new RuntimeException(e); } } @@ -77,7 +75,7 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase { _server.shutdown(); _server.awaitTermination(); } - } catch (InterruptedException | TimeoutException e) { + } catch (Exception e) { throw new RuntimeException(e); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java index 36df3de6ff..75401ca07f 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java @@ -20,7 +20,6 @@ package org.apache.pinot.query; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.TimeoutException; import org.apache.helix.HelixManager; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; @@ -119,7 +118,7 @@ public class QueryServerEnclosure { public void shutDown() { try { _queryRunner.shutDown(); - } catch (TimeoutException e) { + } catch (Exception e) { throw new RuntimeException(e); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/YieldingSchedulerServiceTest.java similarity index 99% rename from pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java rename to pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/YieldingSchedulerServiceTest.java index fbbab9e4d9..4f7f8e5565 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/YieldingSchedulerServiceTest.java @@ -41,7 +41,7 @@ import org.testng.annotations.Test; import static org.mockito.Mockito.clearInvocations; -public class OpChainSchedulerServiceTest { +public class YieldingSchedulerServiceTest { private ExecutorService _executor; private AutoCloseable _mocks; diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java index 9d04d07df9..dc2d2dcb75 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java @@ -18,10 +18,13 @@ */ package org.apache.pinot.query.runtime.operator; +import com.google.common.annotations.VisibleForTesting; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.function.Function; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.data.table.Record; @@ -32,6 +35,7 @@ import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction; import org.apache.pinot.core.query.distinct.DistinctTable; +import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; import org.apache.pinot.query.routing.VirtualServerAddress; @@ -75,7 +79,8 @@ public class LeafStageTransferableBlockOperatorTest { List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock( new SelectionResultsBlock(schema, Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), + getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -101,7 +106,9 @@ public class LeafStageTransferableBlockOperatorTest { new SelectionResultsBlock(resultSchema, Arrays.asList(new Object[]{1, 1660000000000L}, new Object[]{0, 1600000000000L})), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, desiredSchema); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), + getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), + desiredSchema); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -123,7 +130,8 @@ public class LeafStageTransferableBlockOperatorTest { new SelectionResultsBlock(schema, Arrays.asList(new Object[]{1, 1660000000000L}, new Object[]{0, 1600000000000L})), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), + getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -148,7 +156,8 @@ public class LeafStageTransferableBlockOperatorTest { queryContext), new InstanceResponseBlock(new SelectionResultsBlock(schema, Collections.emptyList()), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), + getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema); // When: TransferableBlock resultBlock1 = operator.nextBlock(); @@ -178,12 +187,17 @@ public class LeafStageTransferableBlockOperatorTest { errorBlock, new InstanceResponseBlock(new SelectionResultsBlock(schema, Collections.emptyList()), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), + getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema); // When: TransferableBlock resultBlock = operator.nextBlock(); - // Then: + Assert.assertEquals(resultBlock.getContainer().get(0), new Object[]{"foo", 1}); + Assert.assertEquals(resultBlock.getContainer().get(1), new Object[]{"", 2}); + + // When: + resultBlock = operator.nextBlock(); Assert.assertTrue(resultBlock.isErrorBlock()); } @@ -199,7 +213,8 @@ public class LeafStageTransferableBlockOperatorTest { new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new DistinctTable(schema, Arrays.asList(new Record(new Object[]{1, "foo"}), new Record(new Object[]{2, "bar"})))), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), + getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -220,7 +235,8 @@ public class LeafStageTransferableBlockOperatorTest { new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new DistinctTable(schema, Arrays.asList(new Record(new Object[]{"foo", 1}), new Record(new Object[]{"bar", 2})))), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), + getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -244,7 +260,8 @@ public class LeafStageTransferableBlockOperatorTest { List<InstanceResponseBlock> resultsBlockList = Collections.singletonList( new InstanceResponseBlock(new GroupByResultsBlock(schema, Collections.emptyList()), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), + getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -267,7 +284,8 @@ public class LeafStageTransferableBlockOperatorTest { List<InstanceResponseBlock> resultsBlockList = Collections.singletonList( new InstanceResponseBlock(new GroupByResultsBlock(schema, Collections.emptyList()), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), + getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -286,7 +304,8 @@ public class LeafStageTransferableBlockOperatorTest { List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock( new AggregationResultsBlock(queryContext.getAggregationFunctions(), Collections.emptyList()), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), + getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -308,7 +327,9 @@ public class LeafStageTransferableBlockOperatorTest { List<InstanceResponseBlock> responseBlockList = Collections.singletonList( new InstanceResponseBlock(new SelectionResultsBlock(resultSchema, Collections.emptyList()), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), responseBlockList, desiredSchema); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), + getStaticBlockProcessor(responseBlockList), getStaticServerQueryRequests(responseBlockList.size()), + desiredSchema); TransferableBlock resultBlock = operator.nextBlock(); // Then: @@ -331,7 +352,9 @@ public class LeafStageTransferableBlockOperatorTest { new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new DistinctTable(resultSchema, Collections.emptyList())), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), responseBlockList, desiredSchema); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), + getStaticBlockProcessor(responseBlockList), getStaticServerQueryRequests(responseBlockList.size()), + desiredSchema); TransferableBlock resultBlock = operator.nextBlock(); // Then: @@ -353,11 +376,42 @@ public class LeafStageTransferableBlockOperatorTest { List<InstanceResponseBlock> responseBlockList = Collections.singletonList( new InstanceResponseBlock(new GroupByResultsBlock(resultSchema, Collections.emptyList()), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), responseBlockList, desiredSchema); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), + getStaticBlockProcessor(responseBlockList), getStaticServerQueryRequests(responseBlockList.size()), + desiredSchema); TransferableBlock resultBlock = operator.nextBlock(); // Then: Assert.assertEquals(resultBlock.getContainer().size(), 0); Assert.assertEquals(resultBlock.getDataSchema(), desiredSchema); } + + @VisibleForTesting + static Function<ServerQueryRequest, InstanceResponseBlock> getStaticBlockProcessor( + List<InstanceResponseBlock> resultBlockList) { + return new StaticBlockProcessor(resultBlockList)::process; + } + + static List<ServerQueryRequest> getStaticServerQueryRequests(int count) { + List<ServerQueryRequest> staticMockRequests = new ArrayList<>(); + while (count > 0) { + staticMockRequests.add(mock(ServerQueryRequest.class)); + count--; + } + return staticMockRequests; + } + + private static class StaticBlockProcessor { + private final List<InstanceResponseBlock> _resultBlockList; + private int _currentIdx; + + StaticBlockProcessor(List<InstanceResponseBlock> resultBlockList) { + _resultBlockList = resultBlockList; + _currentIdx = 0; + } + + public InstanceResponseBlock process(ServerQueryRequest request) { + return _resultBlockList.get(_currentIdx++); + } + } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java index 3ec9fc71de..585e884e65 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java @@ -34,6 +34,7 @@ import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; +import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; import org.apache.pinot.query.mailbox.MailboxService; @@ -57,6 +58,7 @@ import org.testng.annotations.Test; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -286,8 +288,9 @@ public class OpChainTest { QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT intCol FROM tbl"); List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock( new SelectionResultsBlock(upStreamSchema, Arrays.asList(new Object[]{1}, new Object[]{2})), queryContext)); - LeafStageTransferableBlockOperator leafOp = - new LeafStageTransferableBlockOperator(context, resultsBlockList, upStreamSchema); + LeafStageTransferableBlockOperator leafOp = new LeafStageTransferableBlockOperator(context, + LeafStageTransferableBlockOperatorTest.getStaticBlockProcessor(resultsBlockList), + Collections.singletonList(mock(ServerQueryRequest.class)), upStreamSchema); //Transform operator RexExpression.InputRef ref0 = new RexExpression.InputRef(0); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org