This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 0912544cce [multistage] follow up leaf scheduler (#10760) 0912544cce is described below commit 0912544cce9307462c5931abdc61131ec2d50bfa Author: Rong Rong <ro...@apache.org> AuthorDate: Sat May 13 09:53:21 2023 -0700 [multistage] follow up leaf scheduler (#10760) this is a follow-up on comments on #10711 * with the execution model change, mailbox send no longer need to do a while loop * with the changes to leaf-stage operator it is no longer needed for a separate scheduler. * also fix scheduler yielding naming in test --------- Co-authored-by: Rong Rong <ro...@startree.ai> --- .../apache/pinot/query/runtime/QueryRunner.java | 7 +- .../runtime/executor/LeafSchedulerService.java | 102 --------------------- .../runtime/executor/OpChainSchedulerService.java | 10 +- .../runtime/executor/RoundRobinScheduler.java | 2 +- .../runtime/operator/MailboxSendOperator.java | 29 +++--- ...eTest.java => OpChainSchedulerServiceTest.java} | 2 +- .../runtime/operator/MailboxSendOperatorTest.java | 8 +- 7 files changed, 31 insertions(+), 129 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 0ff209b904..92bf32638d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -45,7 +45,6 @@ import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.routing.WorkerMetadata; -import org.apache.pinot.query.runtime.executor.LeafSchedulerService; import org.apache.pinot.query.runtime.executor.OpChainSchedulerService; import org.apache.pinot.query.runtime.executor.RoundRobinScheduler; import org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator; @@ -91,7 +90,7 @@ public class QueryRunner { private ExecutorService _queryRunnerExecutorService; private OpChainSchedulerService _intermScheduler; - private LeafSchedulerService _leafScheduler; + private OpChainSchedulerService _leafScheduler; /** * Initializes the query executor. @@ -117,7 +116,7 @@ public class QueryRunner { new NamedThreadFactory("query_runner_on_" + _port + "_port")); _intermScheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), getQueryWorkerIntermExecutorService()); - _leafScheduler = new LeafSchedulerService(getQueryRunnerExecutorService()); + _leafScheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), getQueryRunnerExecutorService()); _mailboxService = new MailboxService(_hostname, _port, config, _intermScheduler::onDataAvailable); _serverExecutor = new ServerQueryExecutorV1Impl(); _serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics); @@ -131,6 +130,7 @@ public class QueryRunner { _helixPropertyStore = _helixManager.getHelixPropertyStore(); _mailboxService.start(); _serverExecutor.start(); + _leafScheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS); _intermScheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS); } @@ -138,6 +138,7 @@ public class QueryRunner { throws TimeoutException { _serverExecutor.shutDown(); _mailboxService.shutdown(); + _leafScheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS); _intermScheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/LeafSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/LeafSchedulerService.java deleted file mode 100644 index faa4bf4945..0000000000 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/LeafSchedulerService.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.query.runtime.executor; - -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.stream.Collectors; -import org.apache.pinot.core.util.trace.TraceRunnable; -import org.apache.pinot.query.runtime.blocks.TransferableBlock; -import org.apache.pinot.query.runtime.operator.OpChain; -import org.apache.pinot.query.runtime.operator.OpChainId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class LeafSchedulerService { - private static final Logger LOGGER = LoggerFactory.getLogger(LeafSchedulerService.class); - - private final ExecutorService _executorService; - private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap; - - public LeafSchedulerService(ExecutorService executorService) { - _executorService = executorService; - _submittedOpChainMap = new ConcurrentHashMap<>(); - } - - public void register(OpChain operatorChain) { - Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() { - @Override - public void runJob() { - boolean isFinished = false; - boolean returnedErrorBlock = false; - Throwable thrown = null; - try { - LOGGER.trace("({}): Executing", operatorChain); - operatorChain.getStats().executing(); - TransferableBlock result = operatorChain.getRoot().nextBlock(); - while (!result.isEndOfStreamBlock()) { - result = operatorChain.getRoot().nextBlock(); - } - isFinished = true; - if (result.isErrorBlock()) { - returnedErrorBlock = true; - LOGGER.error("({}): Completed erroneously {} {}", operatorChain, operatorChain.getStats(), - result.getDataBlock().getExceptions()); - } else { - LOGGER.debug("({}): Completed {}", operatorChain, operatorChain.getStats()); - } - } catch (Exception e) { - LOGGER.error("({}): Failed to execute operator chain! {}", operatorChain, operatorChain.getStats(), e); - thrown = e; - } finally { - if (returnedErrorBlock || thrown != null) { - cancelOpChain(operatorChain, thrown); - } else if (isFinished) { - closeOpChain(operatorChain); - } - } - } - }); - _submittedOpChainMap.put(operatorChain.getId(), scheduledFuture); - } - - public void cancel(long requestId) { - // simple cancellation. for leaf stage this cannot be a dangling opchain b/c they will eventually be cleared up - // via query timeout. - List<OpChainId> opChainIdsToCancel = _submittedOpChainMap.keySet() - .stream().filter(opChainId -> opChainId.getRequestId() == requestId).collect(Collectors.toList()); - for (OpChainId opChainId : opChainIdsToCancel) { - Future<?> future = _submittedOpChainMap.get(opChainId); - if (future != null) { - future.cancel(true); - } - } - } - - private void closeOpChain(OpChain opChain) { - opChain.close(); - } - - private void cancelOpChain(OpChain opChain, Throwable t) { - opChain.cancel(t); - } -} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java index a886269ab9..b4e55796b7 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java @@ -48,20 +48,16 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService { * Default cancel signal retention, this should be set to several times larger than * {@link org.apache.pinot.query.service.QueryConfig#DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS}. */ - private static final long DEFAULT_SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS = 60_000L; + private static final long SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS = 60_000L; private final OpChainScheduler _scheduler; private final ExecutorService _workerPool; - private final Cache<Long, Long> _cancelledRequests; + private final Cache<Long, Long> _cancelledRequests = CacheBuilder.newBuilder() + .expireAfterWrite(SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS, TimeUnit.MILLISECONDS).build(); public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) { - this(scheduler, workerPool, DEFAULT_SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS); - } - - public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool, long cancelRetentionMs) { _scheduler = scheduler; _workerPool = workerPool; - _cancelledRequests = CacheBuilder.newBuilder().expireAfterWrite(cancelRetentionMs, TimeUnit.MILLISECONDS).build(); } @Override diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java index a1934d884a..b143cee77e 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java @@ -94,7 +94,7 @@ public class RoundRobinScheduler implements OpChainScheduler { private final Supplier<Long> _ticker; private final Map<OpChainId, OpChain> _aliveChains = new ConcurrentHashMap<>(); - final Set<OpChainId> _seenMail = Sets.newConcurrentHashSet(); + private final Set<OpChainId> _seenMail = Sets.newConcurrentHashSet(); private final Map<OpChainId, Long> _available = new ConcurrentHashMap<>(); private final BlockingQueue<OpChain> _ready = new LinkedBlockingQueue<>(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index 5fba67f735..4741d0a151 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -109,7 +109,8 @@ public class MailboxSendOperator extends MultiStageOperator { sendingMailboxes.add(mailboxService.getSendingMailbox(receiverMailboxMetadatas.getVirtualAddress(i).hostname(), receiverMailboxMetadatas.getVirtualAddress(i).port(), sendingMailboxIds.get(i), deadlineMs)); } - return BlockExchange.getExchange(sendingMailboxes, exchangeType, keySelector, TransferableBlockUtils::splitBlock); + return BlockExchange.getExchange(sendingMailboxes, exchangeType, keySelector, + TransferableBlockUtils::splitBlock); } @Override @@ -128,21 +129,21 @@ public class MailboxSendOperator extends MultiStageOperator { TransferableBlock transferableBlock; try { transferableBlock = _sourceOperator.nextBlock(); - while (!transferableBlock.isNoOpBlock()) { - if (transferableBlock.isEndOfStreamBlock()) { - if (transferableBlock.isSuccessfulEndOfStreamBlock()) { - //Stats need to be populated here because the block is being sent to the mailbox - // and the receiving opChain will not be able to access the stats from the previous opChain - TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock( - OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap())); - _exchange.send(eosBlockWithStats); - } else { - _exchange.send(transferableBlock); - } - return transferableBlock; + if (transferableBlock.isNoOpBlock()) { + return transferableBlock; + } else if (transferableBlock.isEndOfStreamBlock()) { + if (transferableBlock.isSuccessfulEndOfStreamBlock()) { + // Stats need to be populated here because the block is being sent to the mailbox + // and the receiving opChain will not be able to access the stats from the previous opChain + TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock( + OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap())); + _exchange.send(eosBlockWithStats); + } else { + _exchange.send(transferableBlock); } + } else { // normal blocks + // check whether we should continue depending on exchange queue condition. _exchange.send(transferableBlock); - transferableBlock = _sourceOperator.nextBlock(); } } catch (Exception e) { transferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/YieldingSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java similarity index 99% rename from pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/YieldingSchedulerServiceTest.java rename to pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java index 4f7f8e5565..fbbab9e4d9 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/YieldingSchedulerServiceTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java @@ -41,7 +41,7 @@ import org.testng.annotations.Test; import static org.mockito.Mockito.clearInvocations; -public class YieldingSchedulerServiceTest { +public class OpChainSchedulerServiceTest { private ExecutorService _executor; private AutoCloseable _mocks; diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java index 5080ad8454..1d13e5f097 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java @@ -151,7 +151,13 @@ public class MailboxSendOperatorTest { TransferableBlock block = mailboxSendOperator.nextBlock(); // Then: - assertSame(block, eosBlock, "expected EOS block to propagate"); + assertSame(block, dataBlock, "expected data block to propagate first"); + + // When: + block = mailboxSendOperator.nextBlock(); + + // Then: + assertSame(block, eosBlock, "expected EOS block to propagate next"); ArgumentCaptor<TransferableBlock> captor = ArgumentCaptor.forClass(TransferableBlock.class); verify(_exchange, times(2)).send(captor.capture()); List<TransferableBlock> blocks = captor.getAllValues(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org