This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 58b113225a [multistage][refactor] Introduce MultiStageOperator with close() API (#10123) 58b113225a is described below commit 58b113225a3e518dd261553274c0e75abb69a29f Author: Yao Liu <y...@startree.ai> AuthorDate: Wed Jan 18 09:22:47 2023 -0800 [multistage][refactor] Introduce MultiStageOperator with close() API (#10123) * v2 operator * test * test * opchain scheduler * v2 operator * fix scheduling * add comment * address feedback * address comments * add todo * change class name * change getChildOp API --- .../org/apache/pinot/core/common/Operator.java | 2 +- .../query/executor/ServerQueryExecutorV1Impl.java | 4 +- .../query/runtime/executor/OpChainScheduler.java | 6 +++ .../runtime/executor/OpChainSchedulerService.java | 17 +++++-- .../runtime/executor/RoundRobinScheduler.java | 24 +++++++++- .../query/runtime/operator/AggregateOperator.java | 16 +++---- .../query/runtime/operator/FilterOperator.java | 14 +++--- .../query/runtime/operator/HashJoinOperator.java | 19 ++++---- .../LeafStageTransferableBlockOperator.java | 9 ++-- .../runtime/operator/LiteralValueOperator.java | 10 ++-- .../runtime/operator/MailboxReceiveOperator.java | 9 ++-- .../runtime/operator/MailboxSendOperator.java | 16 +++---- .../query/runtime/operator/MultiStageOperator.java | 49 ++++++++++++++++++++ .../pinot/query/runtime/operator/OpChain.java | 12 +++-- .../pinot/query/runtime/operator/SortOperator.java | 16 +++---- .../query/runtime/operator/TransformOperator.java | 14 +++--- .../query/runtime/plan/PhysicalPlanVisitor.java | 39 ++++++++-------- .../executor/OpChainSchedulerServiceTest.java | 54 ++++++++++++++++------ .../runtime/executor/RoundRobinSchedulerTest.java | 5 +- .../runtime/operator/AggregateOperatorTest.java | 6 +-- .../query/runtime/operator/FilterOperatorTest.java | 3 +- .../runtime/operator/HashJoinOperatorTest.java | 5 +- .../runtime/operator/MailboxSendOperatorTest.java | 3 +- .../query/runtime/operator/OperatorTestUtil.java | 3 +- .../query/runtime/operator/SortOperatorTest.java | 3 +- .../runtime/operator/TransformOperatorTest.java | 3 +- .../testutils/MockDataBlockOperatorFactory.java | 6 +-- 27 files changed, 226 insertions(+), 141 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/Operator.java b/pinot-core/src/main/java/org/apache/pinot/core/common/Operator.java index 5926805c6a..f86caf9798 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/Operator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/Operator.java @@ -42,7 +42,7 @@ public interface Operator<T extends Block> { T nextBlock(); /** @return List of {@link Operator}s that this operator depends upon. */ - List<Operator> getChildOperators(); + List<? extends Operator> getChildOperators(); /** @return Explain Plan description if available; otherwise, null. */ @Nullable diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index 92467ba316..40dff35893 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -408,7 +408,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { Map<Integer, HashSet<Integer>> uniquePlanNodeHashCodes = new HashMap<>(); // Obtain the list of all possible segment plans after the combine root node - List<Operator> children = root.getChildOperators(); + List<? extends Operator> children = root.getChildOperators(); for (Operator child : children) { int[] operatorId = {3}; ExplainPlanRows explainPlanRows = new ExplainPlanRows(); @@ -483,7 +483,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { public static InstanceResponseBlock executeExplainQuery(Plan queryPlan, QueryContext queryContext) { ExplainResultsBlock explainResults = new ExplainResultsBlock(); - List<Operator> childOperators = queryPlan.getPlanNode().run().getChildOperators(); + List<? extends Operator> childOperators = queryPlan.getPlanNode().run().getChildOperators(); assert childOperators.size() == 1; Operator root = childOperators.get(0); Map<Integer, List<ExplainPlanRows>> operatorDepthToRowDataMap; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java index 8b35772542..635e5aacfb 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java @@ -44,6 +44,12 @@ public interface OpChainScheduler { */ void onDataAvailable(MailboxIdentifier mailbox); + /** + * This method is called when scheduler is terminating. It should clean up all of the resources if there are any. + * register() and onDataAvailable() shouldn't be called anymore after shutDown is called. + */ + void shutDown(); + /** * @return whether or not there is any work for the scheduler to do */ diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java index e40b2dbf8f..21a944d4f7 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java @@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.executor; import com.google.common.util.concurrent.AbstractExecutionThreadService; import com.google.common.util.concurrent.Monitor; +import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.pinot.core.util.trace.TraceRunnable; @@ -43,6 +44,8 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService { private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class); + private static final int TERMINATION_TIMEOUT_SEC = 60; + private final OpChainScheduler _scheduler; private final ExecutorService _workerPool; private final long _pollIntervalMs; @@ -56,6 +59,7 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService { } }; + // Note that workerPool is shut down in this class. public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) { this(scheduler, workerPool, -1); } @@ -72,6 +76,10 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService { // this will just notify all waiters that the scheduler is shutting down _monitor.enter(); _monitor.leave(); + if (!MoreExecutors.shutdownAndAwaitTermination(_workerPool, TERMINATION_TIMEOUT_SEC, TimeUnit.SECONDS)) { + LOGGER.error("Failed to shut down and terminate OpChainScheduler."); + } + _scheduler.shutDown(); } @Override @@ -111,8 +119,10 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService { } else { LOGGER.debug("({}): Completed {}", operatorChain, operatorChain.getStats()); } + operatorChain.close(); } } catch (Exception e) { + operatorChain.close(); LOGGER.error("({}): Failed to execute operator chain! {}", operatorChain, operatorChain.getStats(), e); } } @@ -142,9 +152,7 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService { public final void register(OpChain operatorChain) { register(operatorChain, true); LOGGER.debug("({}): Scheduler is now handling operator chain listening to mailboxes {}. " - + "There are a total of {} chains awaiting execution.", - operatorChain, - operatorChain.getReceivingMailbox(), + + "There are a total of {} chains awaiting execution.", operatorChain, operatorChain.getReceivingMailbox(), _scheduler.size()); // we want to track the time that it takes from registering @@ -156,8 +164,7 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService { public final void register(OpChain operatorChain, boolean isNew) { _monitor.enter(); try { - LOGGER.trace("({}): Registered operator chain (new: {}). Total: {}", - operatorChain, isNew, _scheduler.size()); + LOGGER.trace("({}): Registered operator chain (new: {}). Total: {}", operatorChain, isNew, _scheduler.size()); _scheduler.register(operatorChain, isNew); operatorChain.getStats().queued(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java index 3a20bf5d6a..c7b77f85e7 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java @@ -27,6 +27,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import javax.annotation.concurrent.NotThreadSafe; import org.apache.pinot.query.mailbox.MailboxIdentifier; import org.apache.pinot.query.runtime.operator.OpChain; import org.slf4j.Logger; @@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory; * of work is signaled using the {@link #onDataAvailable(MailboxIdentifier)} * callback. */ +@NotThreadSafe public class RoundRobinScheduler implements OpChainScheduler { private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinScheduler.class); private static final long DEFAULT_RELEASE_TIMEOUT = TimeUnit.MINUTES.toMillis(1); @@ -53,6 +55,8 @@ public class RoundRobinScheduler implements OpChainScheduler { private final Queue<AvailableEntry> _available = new LinkedList<>(); private final Queue<OpChain> _ready = new LinkedList<>(); + private boolean _isShutDown = false; + // using a Set here is acceptable because calling hasNext() and // onDataAvailable() cannot occur concurrently - that means that // anytime we schedule a new operator based on the presence of @@ -79,6 +83,9 @@ public class RoundRobinScheduler implements OpChainScheduler { @Override public void register(OpChain operatorChain, boolean isNew) { + if (_isShutDown) { + return; + } // the first time an operator chain is scheduled, it should // immediately be considered ready in case it does not need // read from any mailbox (e.g. with a LiteralValueOperator) @@ -134,6 +141,20 @@ public class RoundRobinScheduler implements OpChainScheduler { return _ready.size() + _available.size(); } + @Override + public void shutDown() { + if (_isShutDown) { + return; + } + while (!_ready.isEmpty()) { + _ready.poll().close(); + } + while (!_available.isEmpty()) { + _available.poll()._opChain.close(); + } + _isShutDown = true; + } + private void computeReady() { Iterator<AvailableEntry> availableChains = _available.iterator(); @@ -164,8 +185,7 @@ public class RoundRobinScheduler implements OpChainScheduler { } private void trace(String operation) { - LOGGER.trace("({}) Ready: {}, Available: {}, Mail: {}", - operation, _ready, _available, _seenMail); + LOGGER.trace("({}) Ready: {}, Available: {}, Mail: {}", operation, _ready, _available, _seenMail); } private static class AvailableEntry { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java index fe61ce244e..a82048949f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java @@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.operator; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.HashMap; @@ -30,9 +31,7 @@ import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.data.table.Key; -import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; @@ -53,10 +52,10 @@ import org.apache.pinot.spi.data.FieldSpec; * Note: This class performs aggregation over the double value of input. * If the input is single value, the output type will be input type. Otherwise, the output type will be double. */ -public class AggregateOperator extends BaseOperator<TransferableBlock> { +public class AggregateOperator extends MultiStageOperator { private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR"; - private final Operator<TransferableBlock> _inputOperator; + private final MultiStageOperator _inputOperator; // TODO: Deal with the case where _aggCalls is empty but we have groupSet setup, which means this is a Distinct call. private final List<RexExpression.FunctionCall> _aggCalls; private final List<RexExpression> _groupSet; @@ -73,13 +72,13 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> { // aggCalls has to be a list of FunctionCall and cannot be null // groupSet has to be a list of InputRef and cannot be null // TODO: Add these two checks when we confirm we can handle error in upstream ctor call. - public AggregateOperator(Operator<TransferableBlock> inputOperator, DataSchema dataSchema, + public AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema, List<RexExpression> aggCalls, List<RexExpression> groupSet) { this(inputOperator, dataSchema, aggCalls, groupSet, AggregateOperator.Accumulator.MERGERS); } @VisibleForTesting - AggregateOperator(Operator<TransferableBlock> inputOperator, DataSchema dataSchema, + AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema, List<RexExpression> aggCalls, List<RexExpression> groupSet, Map<String, Merger> mergers) { _inputOperator = inputOperator; _groupSet = groupSet; @@ -107,9 +106,8 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> { } @Override - public List<Operator> getChildOperators() { - // WorkerExecutor doesn't use getChildOperators, returns null here. - return null; + public List<MultiStageOperator> getChildOperators() { + return ImmutableList.of(_inputOperator); } @Nullable diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java index 32fe4508cf..3ae9eac98f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java @@ -18,13 +18,12 @@ */ package org.apache.pinot.query.runtime.operator; +import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; @@ -45,14 +44,14 @@ import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils; 3) All boolean scalar functions we have that take tranformOperand. Note: Scalar functions are the ones we have in v1 engine and only do function name and arg # matching. */ -public class FilterOperator extends BaseOperator<TransferableBlock> { +public class FilterOperator extends MultiStageOperator { private static final String EXPLAIN_NAME = "FILTER"; - private final Operator<TransferableBlock> _upstreamOperator; + private final MultiStageOperator _upstreamOperator; private final TransformOperand _filterOperand; private final DataSchema _dataSchema; private TransferableBlock _upstreamErrorBlock; - public FilterOperator(Operator<TransferableBlock> upstreamOperator, DataSchema dataSchema, RexExpression filter) { + public FilterOperator(MultiStageOperator upstreamOperator, DataSchema dataSchema, RexExpression filter) { _upstreamOperator = upstreamOperator; _dataSchema = dataSchema; _filterOperand = TransformOperand.toTransformOperand(filter, dataSchema); @@ -60,9 +59,8 @@ public class FilterOperator extends BaseOperator<TransferableBlock> { } @Override - public List<Operator> getChildOperators() { - // WorkerExecutor doesn't use getChildOperators, returns null here. - return null; + public List<MultiStageOperator> getChildOperators() { + return ImmutableList.of(_upstreamOperator); } @Nullable diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java index 7fe3902f61..be47c9f389 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java @@ -1,5 +1,3 @@ - - /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -21,6 +19,7 @@ package org.apache.pinot.query.runtime.operator; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import java.util.ArrayList; import java.util.HashMap; @@ -32,9 +31,7 @@ import javax.annotation.Nullable; import org.apache.calcite.rel.core.JoinRelType; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.data.table.Key; -import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.planner.partitioning.KeySelector; import org.apache.pinot.query.planner.stage.JoinNode; @@ -56,7 +53,7 @@ import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils; * The output is in the format of [left_row, right_row] */ // TODO: Move inequi out of hashjoin. (https://github.com/apache/pinot/issues/9728) -public class HashJoinOperator extends BaseOperator<TransferableBlock> { +public class HashJoinOperator extends MultiStageOperator { private static final String EXPLAIN_NAME = "HASH_JOIN"; private static final Set<JoinRelType> SUPPORTED_JOIN_TYPES = ImmutableSet.of(JoinRelType.INNER, JoinRelType.LEFT, JoinRelType.RIGHT, JoinRelType.FULL); @@ -68,8 +65,8 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> { // TODO: Replace hashset with rolling bit map. private final HashMap<Key, HashSet<Integer>> _matchedRightRows; - private final Operator<TransferableBlock> _leftTableOperator; - private final Operator<TransferableBlock> _rightTableOperator; + private final MultiStageOperator _leftTableOperator; + private final MultiStageOperator _rightTableOperator; private final JoinRelType _joinType; private final DataSchema _resultSchema; private final int _leftRowSize; @@ -85,7 +82,7 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> { private KeySelector<Object[], Object[]> _leftKeySelector; private KeySelector<Object[], Object[]> _rightKeySelector; - public HashJoinOperator(Operator<TransferableBlock> leftTableOperator, Operator<TransferableBlock> rightTableOperator, + public HashJoinOperator(MultiStageOperator leftTableOperator, MultiStageOperator rightTableOperator, DataSchema leftSchema, JoinNode node) { Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(node.getJoinRelType()), "Join type: " + node.getJoinRelType() + " is not supported!"); @@ -116,10 +113,10 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> { _upstreamErrorBlock = null; } + // TODO: Separate left and right table operator. @Override - public List<Operator> getChildOperators() { - // WorkerExecutor doesn't use getChildOperators, returns null here. - return null; + public List<MultiStageOperator> getChildOperators() { + return ImmutableList.of(_leftTableOperator, _rightTableOperator); } @Nullable diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java index 0f93cfece4..365d04f863 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java @@ -19,6 +19,7 @@ package org.apache.pinot.query.runtime.operator; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -31,8 +32,6 @@ import org.apache.pinot.common.datablock.DataBlockUtils; import org.apache.pinot.common.datablock.MetadataBlock; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; @@ -56,7 +55,7 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock; * thus requires canonicalization.</li> * </ul> */ -public class LeafStageTransferableBlockOperator extends BaseOperator<TransferableBlock> { +public class LeafStageTransferableBlockOperator extends MultiStageOperator { private static final String EXPLAIN_NAME = "LEAF_STAGE_TRANSFER_OPERATOR"; private final InstanceResponseBlock _errorBlock; @@ -72,8 +71,8 @@ public class LeafStageTransferableBlockOperator extends BaseOperator<Transferabl } @Override - public List<Operator> getChildOperators() { - return null; + public List<MultiStageOperator> getChildOperators() { + return ImmutableList.of(); } @Nullable diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java index 9141abad9a..cace9fa974 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java @@ -18,19 +18,18 @@ */ package org.apache.pinot.query.runtime.operator; +import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; -public class LiteralValueOperator extends BaseOperator<TransferableBlock> { +public class LiteralValueOperator extends MultiStageOperator { private static final String EXPLAIN_NAME = "LITERAL_VALUE_PROVIDER"; private final DataSchema _dataSchema; @@ -44,9 +43,8 @@ public class LiteralValueOperator extends BaseOperator<TransferableBlock> { } @Override - public List<Operator> getChildOperators() { - // WorkerExecutor doesn't use getChildOperators, returns null here. - return null; + public List<MultiStageOperator> getChildOperators() { + return ImmutableList.of(); } @Nullable diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java index c5b618a533..ee97a99e79 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java @@ -19,6 +19,7 @@ package org.apache.pinot.query.runtime.operator; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import java.util.ArrayList; import java.util.Collections; @@ -27,7 +28,6 @@ import java.util.Set; import javax.annotation.Nullable; import org.apache.calcite.rel.RelDistribution; import org.apache.pinot.common.exception.QueryException; -import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.mailbox.MailboxIdentifier; @@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory; * When exchangeType is Singleton, we find the mapping mailbox for the mailboxService. If not found, use empty list. * When exchangeType is non-Singleton, we pull from each instance in round-robin way to get matched mailbox content. */ -public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> { +public class MailboxReceiveOperator extends MultiStageOperator { private static final Logger LOGGER = LoggerFactory.getLogger(MailboxReceiveOperator.class); private static final String EXPLAIN_NAME = "MAILBOX_RECEIVE"; @@ -116,9 +116,8 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> { } @Override - public List<Operator> getChildOperators() { - // WorkerExecutor doesn't use getChildOperators, returns null here. - return null; + public List<MultiStageOperator> getChildOperators() { + return ImmutableList.of(); } @Nullable diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index 3c95eb0e3d..a6299ea60a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.operator; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import java.util.Collections; import java.util.List; @@ -27,8 +28,6 @@ import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.calcite.rel.RelDistribution; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.mailbox.MailboxIdentifier; import org.apache.pinot.query.mailbox.MailboxService; @@ -45,7 +44,7 @@ import org.slf4j.LoggerFactory; /** * This {@code MailboxSendOperator} is created to send {@link TransferableBlock}s to the receiving end. */ -public class MailboxSendOperator extends BaseOperator<TransferableBlock> { +public class MailboxSendOperator extends MultiStageOperator { private static final Logger LOGGER = LoggerFactory.getLogger(MailboxSendOperator.class); private static final String EXPLAIN_NAME = "MAILBOX_SEND"; @@ -53,7 +52,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> { ImmutableSet.of(RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED, RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED); - private final Operator<TransferableBlock> _dataTableBlockBaseOperator; + private final MultiStageOperator _dataTableBlockBaseOperator; private final BlockExchange _exchange; @VisibleForTesting @@ -68,7 +67,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> { } public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, - Operator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances, + MultiStageOperator dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances, RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, String hostName, int port, long jobId, int stageId) { this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances, exchangeType, keySelector, @@ -77,7 +76,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> { @VisibleForTesting MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, - Operator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances, + MultiStageOperator dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances, RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory blockExchangeFactory) { _dataTableBlockBaseOperator = dataTableBlockBaseOperator; @@ -110,9 +109,8 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> { } @Override - public List<Operator> getChildOperators() { - // WorkerExecutor doesn't use getChildOperators, returns null here. - return null; + public List<MultiStageOperator> getChildOperators() { + return ImmutableList.of(); } @Nullable diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java new file mode 100644 index 0000000000..9c82d1cb2a --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.operator; + +import java.util.List; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.slf4j.LoggerFactory; + + +public abstract class MultiStageOperator extends BaseOperator<TransferableBlock> implements AutoCloseable { + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MultiStageOperator.class); + + // TODO: use the API public List<? extends Operator> getChildOperators() to merge two APIs. + @Override + public List<MultiStageOperator> getChildOperators() { + throw new UnsupportedOperationException(); + } + + // TODO: Ideally close() call should finish within request deadline. + // TODO: Consider passing deadline as part of the API. + @Override + public void close() { + for (MultiStageOperator op : getChildOperators()) { + try { + op.close(); + } catch (Exception e) { + LOGGER.error("Failed to close operator:" + op); + // Continue processing because even one operator failed to be close, we should still close the rest. + } + } + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java index 8ffeec7e22..424d7d003f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java @@ -30,15 +30,14 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock; * An {@code OpChain} represents a chain of operators that are separated * by send/receive stages. */ -public class OpChain { +public class OpChain implements AutoCloseable { - private final Operator<TransferableBlock> _root; + private final MultiStageOperator _root; private final Set<MailboxIdentifier> _receivingMailbox; private final OpChainStats _stats; private final String _id; - public OpChain(Operator<TransferableBlock> root, List<MailboxIdentifier> receivingMailboxes, long requestId, - int stageId) { + public OpChain(MultiStageOperator root, List<MailboxIdentifier> receivingMailboxes, long requestId, int stageId) { _root = root; _receivingMailbox = new HashSet<>(receivingMailboxes); _id = String.format("%s_%s", requestId, stageId); @@ -61,4 +60,9 @@ public class OpChain { public String toString() { return "OpChain{" + _id + "}"; } + + @Override + public void close() { + _root.close(); + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java index 87c14d3be0..4ba3dbde94 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java @@ -19,6 +19,7 @@ package org.apache.pinot.query.runtime.operator; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import java.util.Comparator; import java.util.LinkedList; import java.util.List; @@ -27,17 +28,15 @@ import javax.annotation.Nullable; import org.apache.calcite.rel.RelFieldCollation; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.query.selection.SelectionOperatorUtils; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; -public class SortOperator extends BaseOperator<TransferableBlock> { +public class SortOperator extends MultiStageOperator { private static final String EXPLAIN_NAME = "SORT"; - private final Operator<TransferableBlock> _upstreamOperator; + private final MultiStageOperator _upstreamOperator; private final int _fetch; private final int _offset; private final DataSchema _dataSchema; @@ -48,14 +47,14 @@ public class SortOperator extends BaseOperator<TransferableBlock> { private boolean _isSortedBlockConstructed; private TransferableBlock _upstreamErrorBlock; - public SortOperator(Operator<TransferableBlock> upstreamOperator, List<RexExpression> collationKeys, + public SortOperator(MultiStageOperator upstreamOperator, List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema) { this(upstreamOperator, collationKeys, collationDirections, fetch, offset, dataSchema, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY); } @VisibleForTesting - SortOperator(Operator<TransferableBlock> upstreamOperator, List<RexExpression> collationKeys, + SortOperator(MultiStageOperator upstreamOperator, List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema, int maxHolderCapacity) { _upstreamOperator = upstreamOperator; @@ -72,9 +71,8 @@ public class SortOperator extends BaseOperator<TransferableBlock> { } @Override - public List<Operator> getChildOperators() { - // WorkerExecutor doesn't use getChildOperators, returns null here. - return null; + public List<MultiStageOperator> getChildOperators() { + return ImmutableList.of(_upstreamOperator); } @Nullable diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java index 80af357cec..83db10d4a2 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java @@ -19,13 +19,12 @@ package org.apache.pinot.query.runtime.operator; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; @@ -43,16 +42,16 @@ import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils; * Note: Function transform only runs functions from v1 engine scalar function factory, which only does argument count * and canonicalized function name matching (lower case). */ -public class TransformOperator extends BaseOperator<TransferableBlock> { +public class TransformOperator extends MultiStageOperator { private static final String EXPLAIN_NAME = "TRANSFORM"; - private final Operator<TransferableBlock> _upstreamOperator; + private final MultiStageOperator _upstreamOperator; private final List<TransformOperand> _transformOperandsList; private final int _resultColumnSize; // TODO: Check type matching between resultSchema and the actual result. private final DataSchema _resultSchema; private TransferableBlock _upstreamErrorBlock; - public TransformOperator(Operator<TransferableBlock> upstreamOperator, DataSchema resultSchema, + public TransformOperator(MultiStageOperator upstreamOperator, DataSchema resultSchema, List<RexExpression> transforms, DataSchema upstreamDataSchema) { Preconditions.checkState(!transforms.isEmpty(), "transform operand should not be empty."); Preconditions.checkState(resultSchema.size() == transforms.size(), @@ -67,9 +66,8 @@ public class TransformOperator extends BaseOperator<TransferableBlock> { } @Override - public List<Operator> getChildOperators() { - // WorkerExecutor doesn't use getChildOperators, returns null here. - return null; + public List<MultiStageOperator> getChildOperators() { + return ImmutableList.of(_upstreamOperator); } @Nullable diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java index 2ba2cc7afc..57a0949bc6 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java @@ -19,7 +19,6 @@ package org.apache.pinot.query.runtime.plan; import java.util.List; -import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.planner.StageMetadata; import org.apache.pinot.query.planner.stage.AggregateNode; @@ -33,13 +32,13 @@ import org.apache.pinot.query.planner.stage.StageNode; import org.apache.pinot.query.planner.stage.StageNodeVisitor; import org.apache.pinot.query.planner.stage.TableScanNode; import org.apache.pinot.query.planner.stage.ValueNode; -import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.operator.AggregateOperator; import org.apache.pinot.query.runtime.operator.FilterOperator; import org.apache.pinot.query.runtime.operator.HashJoinOperator; import org.apache.pinot.query.runtime.operator.LiteralValueOperator; import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator; import org.apache.pinot.query.runtime.operator.MailboxSendOperator; +import org.apache.pinot.query.runtime.operator.MultiStageOperator; import org.apache.pinot.query.runtime.operator.OpChain; import org.apache.pinot.query.runtime.operator.SortOperator; import org.apache.pinot.query.runtime.operator.TransformOperator; @@ -52,17 +51,17 @@ import org.apache.pinot.query.runtime.operator.TransformOperator; * * <p>This class should be used statically via {@link #build(StageNode, PlanRequestContext)} */ -public class PhysicalPlanVisitor implements StageNodeVisitor<Operator<TransferableBlock>, PlanRequestContext> { +public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator, PlanRequestContext> { private static final PhysicalPlanVisitor INSTANCE = new PhysicalPlanVisitor(); public static OpChain build(StageNode node, PlanRequestContext context) { - Operator<TransferableBlock> root = node.visit(INSTANCE, context); + MultiStageOperator root = node.visit(INSTANCE, context); return new OpChain(root, context.getReceivingMailboxes(), context.getRequestId(), context.getStageId()); } @Override - public Operator<TransferableBlock> visitMailboxReceive(MailboxReceiveNode node, PlanRequestContext context) { + public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, PlanRequestContext context) { List<ServerInstance> sendingInstances = context.getMetadataMap().get(node.getSenderStageId()).getServerInstances(); MailboxReceiveOperator mailboxReceiveOperator = new MailboxReceiveOperator(context.getMailboxService(), sendingInstances, @@ -73,8 +72,8 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<Operator<Transferab } @Override - public Operator<TransferableBlock> visitMailboxSend(MailboxSendNode node, PlanRequestContext context) { - Operator<TransferableBlock> nextOperator = node.getInputs().get(0).visit(this, context); + public MultiStageOperator visitMailboxSend(MailboxSendNode node, PlanRequestContext context) { + MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); StageMetadata receivingStageMetadata = context.getMetadataMap().get(node.getReceiverStageId()); return new MailboxSendOperator(context.getMailboxService(), nextOperator, receivingStageMetadata.getServerInstances(), node.getExchangeType(), node.getPartitionKeySelector(), @@ -82,50 +81,50 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<Operator<Transferab } @Override - public Operator<TransferableBlock> visitAggregate(AggregateNode node, PlanRequestContext context) { - Operator<TransferableBlock> nextOperator = node.getInputs().get(0).visit(this, context); + public MultiStageOperator visitAggregate(AggregateNode node, PlanRequestContext context) { + MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); return new AggregateOperator(nextOperator, node.getDataSchema(), node.getAggCalls(), node.getGroupSet()); } @Override - public Operator<TransferableBlock> visitFilter(FilterNode node, PlanRequestContext context) { - Operator<TransferableBlock> nextOperator = node.getInputs().get(0).visit(this, context); + public MultiStageOperator visitFilter(FilterNode node, PlanRequestContext context) { + MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); return new FilterOperator(nextOperator, node.getDataSchema(), node.getCondition()); } @Override - public Operator<TransferableBlock> visitJoin(JoinNode node, PlanRequestContext context) { + public MultiStageOperator visitJoin(JoinNode node, PlanRequestContext context) { StageNode left = node.getInputs().get(0); StageNode right = node.getInputs().get(1); - Operator<TransferableBlock> leftOperator = left.visit(this, context); - Operator<TransferableBlock> rightOperator = right.visit(this, context); + MultiStageOperator leftOperator = left.visit(this, context); + MultiStageOperator rightOperator = right.visit(this, context); return new HashJoinOperator(leftOperator, rightOperator, left.getDataSchema(), node); } @Override - public Operator<TransferableBlock> visitProject(ProjectNode node, PlanRequestContext context) { - Operator<TransferableBlock> nextOperator = node.getInputs().get(0).visit(this, context); + public MultiStageOperator visitProject(ProjectNode node, PlanRequestContext context) { + MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); return new TransformOperator(nextOperator, node.getDataSchema(), node.getProjects(), node.getInputs().get(0).getDataSchema()); } @Override - public Operator<TransferableBlock> visitSort(SortNode node, PlanRequestContext context) { - Operator<TransferableBlock> nextOperator = node.getInputs().get(0).visit(this, context); + public MultiStageOperator visitSort(SortNode node, PlanRequestContext context) { + MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); return new SortOperator(nextOperator, node.getCollationKeys(), node.getCollationDirections(), node.getFetch(), node.getOffset(), node.getDataSchema()); } @Override - public Operator<TransferableBlock> visitTableScan(TableScanNode node, PlanRequestContext context) { + public MultiStageOperator visitTableScan(TableScanNode node, PlanRequestContext context) { throw new UnsupportedOperationException("Stage node of type TableScanNode is not supported!"); } @Override - public Operator<TransferableBlock> visitValue(ValueNode node, PlanRequestContext context) { + public MultiStageOperator visitValue(ValueNode node, PlanRequestContext context) { return new LiteralValueOperator(node.getDataSchema(), node.getLiteralRows()); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java index 5a94c3708f..c38a210e95 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java @@ -24,19 +24,20 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.query.runtime.operator.MultiStageOperator; import org.apache.pinot.query.runtime.operator.OpChain; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.testng.Assert; import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.mockito.Mockito.clearInvocations; + public class OpChainSchedulerServiceTest { @@ -44,9 +45,9 @@ public class OpChainSchedulerServiceTest { private AutoCloseable _mocks; @Mock - private Operator<TransferableBlock> _operatorA; + private MultiStageOperator _operatorA; @Mock - private Operator<TransferableBlock> _operatorB; + private MultiStageOperator _operatorB; @Mock private OpChainScheduler _scheduler; @@ -61,16 +62,18 @@ public class OpChainSchedulerServiceTest { _mocks.close(); } - @AfterMethod - public void afterMethod() { - _executor.shutdownNow(); + @BeforeMethod + public void beforeMethod() { + clearInvocations(_scheduler); + clearInvocations(_operatorA); + clearInvocations(_operatorB); } private void initExecutor(int numThreads) { _executor = Executors.newFixedThreadPool(numThreads); } - private OpChain getChain(Operator<TransferableBlock> operator) { + private OpChain getChain(MultiStageOperator operator) { return new OpChain(operator, ImmutableList.of(), 123, 1); } @@ -132,8 +135,7 @@ public class OpChainSchedulerServiceTest { OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor); CountDownLatch latch = new CountDownLatch(1); - Mockito.when(_operatorA.nextBlock()) - .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()) + Mockito.when(_operatorA.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()) .thenAnswer(inv -> { latch.countDown(); return TransferableBlockUtils.getEndOfStreamTransferableBlock(); @@ -155,9 +157,7 @@ public class OpChainSchedulerServiceTest { // Given: initExecutor(1); Mockito.when(_scheduler.hasNext()).thenReturn(true); - Mockito.when(_scheduler.next()) - .thenReturn(getChain(_operatorA)) - .thenReturn(getChain(_operatorB)) + Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA)).thenReturn(getChain(_operatorB)) .thenReturn(getChain(_operatorA)); OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor); @@ -209,6 +209,7 @@ public class OpChainSchedulerServiceTest { // Then: Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected hasNext to be called"); scheduler.stopAsync().awaitTerminated(); + Mockito.verify(_scheduler, Mockito.never()).next(); } @@ -238,4 +239,29 @@ public class OpChainSchedulerServiceTest { Assert.assertTrue(secondHasNext.await(10, TimeUnit.SECONDS), "expected hasNext to be called again"); scheduler.stopAsync().awaitTerminated(); } + + @Test + public void shouldCallCloseOnOperators() + throws InterruptedException { + // Given: + initExecutor(1); + Mockito.when(_scheduler.hasNext()).thenReturn(true).thenReturn(false); + Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA)); + OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor); + + CountDownLatch latch = new CountDownLatch(1); + Mockito.when(_operatorA.nextBlock()).thenAnswer(inv -> { + latch.countDown(); + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); + }); + + // When: + scheduler.startAsync().awaitRunning(); + scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1)); + + // Then: + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds"); + scheduler.stopAsync().awaitTerminated(); + Mockito.verify(_operatorA, Mockito.times(1)).close(); + } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java index 2bc58bdfaf..bb3faf63ec 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java @@ -20,10 +20,9 @@ package org.apache.pinot.query.runtime.executor; import com.google.common.collect.ImmutableList; import java.util.concurrent.atomic.AtomicLong; -import org.apache.pinot.core.common.Operator; import org.apache.pinot.query.mailbox.MailboxIdentifier; import org.apache.pinot.query.mailbox.StringMailboxIdentifier; -import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.operator.MultiStageOperator; import org.apache.pinot.query.runtime.operator.OpChain; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -39,7 +38,7 @@ public class RoundRobinSchedulerTest { private static final MailboxIdentifier MAILBOX_2 = new StringMailboxIdentifier("1_2:foo:2:bar:3"); @Mock - private Operator<TransferableBlock> _operator; + private MultiStageOperator _operator; private AutoCloseable _mocks; diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java index 48b9905523..0996b7f4d0 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java @@ -25,8 +25,6 @@ import java.util.List; import org.apache.calcite.sql.SqlKind; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; @@ -49,7 +47,7 @@ public class AggregateOperatorTest { private AutoCloseable _mocks; @Mock - private Operator<TransferableBlock> _input; + private MultiStageOperator _input; @BeforeMethod public void setUp() { @@ -212,7 +210,7 @@ public class AggregateOperatorTest { @Test public void testGroupByAggregateWithHashCollision() { - BaseOperator<TransferableBlock> upstreamOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1); + MultiStageOperator upstreamOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1); // Create an aggregation call with sum for first column and group by second column. RexExpression.FunctionCall agg = getSum(new RexExpression.InputRef(0)); AggregateOperator sum0GroupBy1 = diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java index 1e78233234..c53ea2037c 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java @@ -24,7 +24,6 @@ import org.apache.calcite.sql.SqlKind; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.common.Operator; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; @@ -41,7 +40,7 @@ import org.testng.annotations.Test; public class FilterOperatorTest { private AutoCloseable _mocks; @Mock - private Operator<TransferableBlock> _upstreamOperator; + private MultiStageOperator _upstreamOperator; @BeforeMethod public void setUp() { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java index 0a3ae48974..b9ea6b2078 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java @@ -27,7 +27,6 @@ import org.apache.calcite.sql.SqlKind; import org.apache.pinot.common.datablock.MetadataBlock; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.common.Operator; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; import org.apache.pinot.query.planner.stage.JoinNode; @@ -47,10 +46,10 @@ public class HashJoinOperatorTest { private AutoCloseable _mocks; @Mock - private Operator<TransferableBlock> _leftOperator; + private MultiStageOperator _leftOperator; @Mock - private Operator<TransferableBlock> _rightOperator; + private MultiStageOperator _rightOperator; @BeforeMethod public void setUp() { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java index 5a4261e4e7..3dc951dca3 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java @@ -23,7 +23,6 @@ import java.util.Arrays; import org.apache.calcite.rel.RelDistribution; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.mailbox.StringMailboxIdentifier; @@ -45,7 +44,7 @@ public class MailboxSendOperatorTest { private AutoCloseable _mocks; @Mock - private Operator<TransferableBlock> _input; + private MultiStageOperator _input; @Mock private MailboxService<TransferableBlock> _mailboxService; @Mock diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java index 0537c67ca9..39921e324d 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java @@ -22,7 +22,6 @@ import java.util.Arrays; import java.util.List; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.testutils.MockDataBlockOperatorFactory; @@ -51,7 +50,7 @@ public class OperatorTestUtil { private OperatorTestUtil() { } - public static BaseOperator<TransferableBlock> getOperator(String operatorName) { + public static MultiStageOperator getOperator(String operatorName) { return MOCK_OPERATOR_FACTORY.buildMockOperator(operatorName); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java index c712fb601e..2cc6b68a67 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java @@ -25,7 +25,6 @@ import java.util.stream.Collectors; import org.apache.calcite.rel.RelFieldCollation.Direction; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.common.Operator; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; @@ -46,7 +45,7 @@ public class SortOperatorTest { private AutoCloseable _mocks; @Mock - private Operator<TransferableBlock> _input; + private MultiStageOperator _input; @BeforeMethod public void setUp() { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java index 9c6370b5f5..b89313c25c 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java @@ -25,7 +25,6 @@ import java.util.List; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.common.Operator; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; @@ -46,7 +45,7 @@ public class TransformOperatorTest { private AutoCloseable _mocks; @Mock - private Operator<TransferableBlock> _upstreamOp; + private MultiStageOperator _upstreamOp; @BeforeMethod public void setUp() { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java index 0140576981..94acd49fd8 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java @@ -24,9 +24,9 @@ import java.util.List; import java.util.Map; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.query.runtime.operator.MultiStageOperator; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -54,8 +54,8 @@ public class MockDataBlockOperatorFactory { } @SuppressWarnings("unchecked") - public BaseOperator<TransferableBlock> buildMockOperator(String operatorName) { - BaseOperator<TransferableBlock> operator = Mockito.mock(BaseOperator.class); + public MultiStageOperator buildMockOperator(String operatorName) { + MultiStageOperator operator = Mockito.mock(MultiStageOperator.class); Mockito.when(operator.nextBlock()).thenAnswer(new Answer<Object>() { private int _invocationCount = 0; public Object answer(InvocationOnMock invocation) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org