This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 941cbf84f9 Do not serialize metrics in each Operator (#10473) 941cbf84f9 is described below commit 941cbf84f912d7cd47f86299c8f4652cd046e097 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Sat Apr 1 19:41:00 2023 +0530 Do not serialize metrics in each Operator (#10473) * WIP: Do not serialize metrics * No need to pass stats between operator. Only collected in the end at the send operator * Use opchain stats to record operatorStats * No need to serialie metrics in receive operator * Remove attachStats method and create stats object inside context itself * Make stats thread safe * Add test for opchain stats * Ensure SendOperator stats are populated before serializing stats * Fix variable scope * Use operator stats map directly from opchain stats * unify return statements outside inner for loop in MailboxSendOperator --------- Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local> --- .../apache/pinot/query/runtime/QueryRunner.java | 6 +- .../runtime/executor/OpChainSchedulerService.java | 1 - .../LeafStageTransferableBlockOperator.java | 3 +- .../runtime/operator/MailboxReceiveOperator.java | 7 +- .../runtime/operator/MailboxSendOperator.java | 12 +- .../query/runtime/operator/MultiStageOperator.java | 42 ++----- .../pinot/query/runtime/operator/OpChain.java | 11 +- .../pinot/query/runtime/operator/OpChainStats.java | 21 +++- .../query/runtime/operator/OperatorStats.java | 14 +-- .../runtime/operator/utils/OperatorUtils.java | 4 +- .../runtime/plan/OpChainExecutionContext.java | 14 +++ .../query/service/dispatch/QueryDispatcher.java | 25 +++-- .../executor/OpChainSchedulerServiceTest.java | 6 +- .../runtime/executor/RoundRobinSchedulerTest.java | 34 ++++-- .../runtime/operator/MailboxSendOperatorTest.java | 2 +- .../pinot/query/runtime/operator/OpChainTest.java | 124 +++++++++++++++++++++ 16 files changed, 237 insertions(+), 89 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index b0d0433f42..097b05b455 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -52,6 +52,7 @@ import org.apache.pinot.query.runtime.executor.OpChainSchedulerService; import org.apache.pinot.query.runtime.executor.RoundRobinScheduler; import org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator; import org.apache.pinot.query.runtime.operator.MailboxSendOperator; +import org.apache.pinot.query.runtime.operator.MultiStageOperator; import org.apache.pinot.query.runtime.operator.OpChain; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; @@ -213,8 +214,9 @@ public class QueryRunner { OpChainExecutionContext opChainExecutionContext = new OpChainExecutionContext(_mailboxService, requestId, sendNode.getStageId(), _rootServer, deadlineMs, deadlineMs, distributedStagePlan.getMetadataMap()); - mailboxSendOperator = new MailboxSendOperator(opChainExecutionContext, - new LeafStageTransferableBlockOperator(opChainExecutionContext, serverQueryResults, sendNode.getDataSchema()), + MultiStageOperator leafStageOperator = + new LeafStageTransferableBlockOperator(opChainExecutionContext, serverQueryResults, sendNode.getDataSchema()); + mailboxSendOperator = new MailboxSendOperator(opChainExecutionContext, leafStageOperator, sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(), sendNode.getCollationDirections(), sendNode.isSortOnSender(), sendNode.getStageId(), sendNode.getReceiverStageId()); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java index 3706627349..1f29584dcc 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java @@ -111,7 +111,6 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService { LOGGER.error("({}): Completed erroneously {} {}", operatorChain, operatorChain.getStats(), result.getDataBlock().getExceptions()); } else { - operatorChain.getStats().setOperatorStatsMap(result.getResultMetadata()); LOGGER.debug("({}): Completed {}", operatorChain, operatorChain.getStats()); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java index 9af6e8819d..a6ed81a3d8 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java @@ -75,7 +75,8 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator { _errorBlock = baseResultBlock.stream().filter(e -> !e.getExceptions().isEmpty()).findFirst().orElse(null); _currentIndex = 0; for (InstanceResponseBlock instanceResponseBlock : baseResultBlock) { - _operatorStats.recordExecutionStats(instanceResponseBlock.getResponseMetadata()); + OperatorStats operatorStats = _opChainStats.getOperatorStats(context, getOperatorId()); + operatorStats.recordExecutionStats(instanceResponseBlock.getResponseMetadata()); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java index fe9782c4c6..f65f9b8d15 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.PriorityQueue; import java.util.Set; import javax.annotation.Nullable; @@ -215,8 +216,10 @@ public class MailboxReceiveOperator extends MultiStageOperator { return block; } } else { - if (!block.getResultMetadata().isEmpty()) { - _operatorStatsMap.putAll(block.getResultMetadata()); + if (_opChainStats != null && !block.getResultMetadata().isEmpty()) { + for (Map.Entry<String, OperatorStats> entry : block.getResultMetadata().entrySet()) { + _opChainStats.getOperatorStatsMap().compute(entry.getKey(), (_key, _value) -> entry.getValue()); + } } eosMailboxCount++; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index 3be593a70a..ad0c24abb7 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -40,6 +40,7 @@ import org.apache.pinot.query.runtime.blocks.BlockSplitter; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.exchange.BlockExchange; +import org.apache.pinot.query.runtime.operator.utils.OperatorUtils; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,10 +152,19 @@ public class MailboxSendOperator extends MultiStageOperator { try { transferableBlock = _dataTableBlockBaseOperator.nextBlock(); while (!transferableBlock.isNoOpBlock()) { - _exchange.send(transferableBlock); if (transferableBlock.isEndOfStreamBlock()) { + if (transferableBlock.isSuccessfulEndOfStreamBlock()) { + //Stats need to be populated here because the block is being sent to the mailbox + // and the receiving opChain will not be able to access the stats from the previous opChain + TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock( + OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap())); + _exchange.send(eosBlockWithStats); + } else { + _exchange.send(transferableBlock); + } return transferableBlock; } + _exchange.send(transferableBlock); transferableBlock = _dataTableBlockBaseOperator.nextBlock(); } } catch (final Exception e) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java index 160b3d0f58..881c499c0f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java @@ -19,14 +19,9 @@ package org.apache.pinot.query.runtime.operator; import com.google.common.base.Joiner; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.core.common.Operator; import org.apache.pinot.query.runtime.blocks.TransferableBlock; -import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; -import org.apache.pinot.query.runtime.operator.utils.OperatorUtils; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.spi.exception.EarlyTerminationException; import org.apache.pinot.spi.trace.InvocationScope; @@ -37,23 +32,15 @@ import org.slf4j.LoggerFactory; public abstract class MultiStageOperator implements Operator<TransferableBlock>, AutoCloseable { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MultiStageOperator.class); - // TODO: Move to OperatorContext class. - protected final OperatorStats _operatorStats; - protected final Map<String, OperatorStats> _operatorStatsMap; private final String _operatorId; private final OpChainExecutionContext _context; + protected final OpChainStats _opChainStats; public MultiStageOperator(OpChainExecutionContext context) { _context = context; - _operatorStats = - new OperatorStats(_context, toExplainString()); - _operatorStatsMap = new HashMap<>(); _operatorId = Joiner.on("_").join(toExplainString(), _context.getRequestId(), _context.getStageId(), _context.getServer()); - } - - public Map<String, OperatorStats> getOperatorStatsMap() { - return _operatorStatsMap; + _opChainStats = _context.getStats(); } @Override @@ -62,28 +49,19 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>, throw new EarlyTerminationException("Interrupted while processing next block"); } try (InvocationScope ignored = Tracing.getTracer().createScope(getClass())) { - _operatorStats.startTimer(); + OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, _operatorId); + operatorStats.startTimer(); TransferableBlock nextBlock = getNextBlock(); - _operatorStats.endTimer(nextBlock); - - _operatorStats.recordRow(1, nextBlock.getNumRows()); - if (nextBlock.isEndOfStreamBlock()) { - if (nextBlock.isSuccessfulEndOfStreamBlock()) { - for (MultiStageOperator op : getChildOperators()) { - _operatorStatsMap.putAll(op.getOperatorStatsMap()); - } - if (!_operatorStats.getExecutionStats().isEmpty()) { - _operatorStats.recordSingleStat(DataTable.MetadataKey.OPERATOR_ID.getName(), _operatorId); - _operatorStatsMap.put(_operatorId, _operatorStats); - } - return TransferableBlockUtils.getEndOfStreamTransferableBlock( - OperatorUtils.getMetadataFromOperatorStats(_operatorStatsMap)); - } - } + operatorStats.recordRow(1, nextBlock.getNumRows()); + operatorStats.endTimer(nextBlock); return nextBlock; } } + public String getOperatorId() { + return _operatorId; + } + // Make it protected because we should always call nextBlock() protected abstract TransferableBlock getNextBlock(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java index 84fac943c2..53176787ce 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java @@ -37,16 +37,11 @@ public class OpChain implements AutoCloseable { private final OpChainStats _stats; private final OpChainId _id; - public OpChain(MultiStageOperator root, List<MailboxIdentifier> receivingMailboxes, int virtualServerId, - long requestId, int stageId) { + public OpChain(OpChainExecutionContext context, MultiStageOperator root, List<MailboxIdentifier> receivingMailboxes) { _root = root; _receivingMailbox = new HashSet<>(receivingMailboxes); - _id = new OpChainId(requestId, virtualServerId, stageId); - _stats = new OpChainStats(_id.toString()); - } - - public OpChain(OpChainExecutionContext context, MultiStageOperator root, List<MailboxIdentifier> receivingMailboxes) { - this(root, receivingMailboxes, context.getServer().virtualId(), context.getRequestId(), context.getStageId()); + _id = context.getId(); + _stats = context.getStats(); } public Operator<TransferableBlock> getRoot() { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java index 60dba3a0de..5b2cc2a065 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java @@ -21,12 +21,13 @@ package org.apache.pinot.query.runtime.operator; import com.google.common.base.Stopwatch; import com.google.common.base.Suppliers; -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import javax.annotation.concurrent.NotThreadSafe; +import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; @@ -50,7 +51,7 @@ public class OpChainStats { private final AtomicLong _queuedCount = new AtomicLong(); private final String _id; - private Map<String, OperatorStats> _operatorStatsMap = new HashMap<>(); + private final ConcurrentHashMap<String, OperatorStats> _operatorStatsMap = new ConcurrentHashMap<>(); public OpChainStats(String id) { _id = id; @@ -73,12 +74,16 @@ public class OpChainStats { } } - public Map<String, OperatorStats> getOperatorStatsMap() { + public ConcurrentHashMap<String, OperatorStats> getOperatorStatsMap() { return _operatorStatsMap; } - public void setOperatorStatsMap(Map<String, OperatorStats> operatorStatsMap) { - _operatorStatsMap = operatorStatsMap; + public OperatorStats getOperatorStats(OpChainExecutionContext context, String operatorId) { + return _operatorStatsMap.computeIfAbsent(operatorId, (id) -> { + OperatorStats operatorStats = new OperatorStats(context); + operatorStats.recordSingleStat(DataTable.MetadataKey.OPERATOR_ID.getName(), operatorId); + return operatorStats; + }); } private void startExecutionTimer() { @@ -89,6 +94,10 @@ public class OpChainStats { } } + public long getExecutionTime() { + return _executeStopwatch.elapsed(TimeUnit.MILLISECONDS); + } + @Override public String toString() { return String.format("(%s) Queued Count: %s, Executing Time: %sms, Queued Time: %sms", _id, _queuedCount.get(), diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java index 52ca89a3ee..2655a5c286 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java @@ -35,9 +35,8 @@ public class OperatorStats { // TODO: add a operatorId for better tracking purpose. private final int _stageId; private final long _requestId; - private final VirtualServerAddress _serverAddress; - private final String _operatorType; + private final VirtualServerAddress _serverAddress; private int _numBlock = 0; private int _numRows = 0; @@ -45,16 +44,15 @@ public class OperatorStats { private final Map<String, String> _executionStats; private boolean _processingStarted = false; - public OperatorStats(OpChainExecutionContext context, String operatorType) { - this(context.getRequestId(), context.getStageId(), context.getServer(), operatorType); + public OperatorStats(OpChainExecutionContext context) { + this(context.getRequestId(), context.getStageId(), context.getServer()); } //TODO: remove this constructor after the context constructor can be used in serialization and deserialization - public OperatorStats(long requestId, int stageId, VirtualServerAddress serverAddress, String operatorType) { + public OperatorStats(long requestId, int stageId, VirtualServerAddress serverAddress) { _stageId = stageId; _requestId = requestId; _serverAddress = serverAddress; - _operatorType = operatorType; _executionStats = new HashMap<>(); } @@ -115,10 +113,6 @@ public class OperatorStats { return _serverAddress; } - public String getOperatorType() { - return _operatorType; - } - @Override public String toString() { return OperatorUtils.operatorStatsToJson(this); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java index ab79339891..601f169cb3 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java @@ -81,7 +81,6 @@ public class OperatorUtils { jsonOut.put("requestId", operatorStats.getRequestId()); jsonOut.put("stageId", operatorStats.getStageId()); jsonOut.put("serverAddress", operatorStats.getServerAddress().toString()); - jsonOut.put("operatorType", operatorStats.getOperatorType()); jsonOut.put("executionStats", operatorStats.getExecutionStats()); return JsonUtils.objectToString(jsonOut); } catch (Exception e) { @@ -97,10 +96,9 @@ public class OperatorUtils { int stageId = operatorStatsNode.get("stageId").asInt(); String serverAddressStr = operatorStatsNode.get("serverAddress").asText(); VirtualServerAddress serverAddress = VirtualServerAddress.parse(serverAddressStr); - String operatorType = operatorStatsNode.get("operatorType").asText(); OperatorStats operatorStats = - new OperatorStats(requestId, stageId, serverAddress, operatorType); + new OperatorStats(requestId, stageId, serverAddress); operatorStats.recordExecutionStats( JsonUtils.jsonNodeToObject(operatorStatsNode.get("executionStats"), new TypeReference<Map<String, String>>() { })); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java index 1f500ce8b2..64141a024b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java @@ -23,6 +23,8 @@ import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.StageMetadata; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.operator.OpChainId; +import org.apache.pinot.query.runtime.operator.OpChainStats; /** @@ -38,6 +40,8 @@ public class OpChainExecutionContext { private final long _timeoutMs; private final long _deadlineMs; private final Map<Integer, StageMetadata> _metadataMap; + private final OpChainId _id; + private final OpChainStats _stats; public OpChainExecutionContext(MailboxService<TransferableBlock> mailboxService, long requestId, int stageId, VirtualServerAddress server, long timeoutMs, long deadlineMs, Map<Integer, StageMetadata> metadataMap) { @@ -48,6 +52,8 @@ public class OpChainExecutionContext { _timeoutMs = timeoutMs; _deadlineMs = deadlineMs; _metadataMap = metadataMap; + _id = new OpChainId(requestId, server.virtualId(), stageId); + _stats = new OpChainStats(_id.toString()); } public OpChainExecutionContext(PlanRequestContext planRequestContext) { @@ -83,4 +89,12 @@ public class OpChainExecutionContext { public Map<Integer, StageMetadata> getMetadataMap() { return _metadataMap; } + + public OpChainId getId() { + return _id; + } + + public OpChainStats getStats() { + return _stats; + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index 4d01c67105..36c93a5827 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -55,6 +55,7 @@ import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator; +import org.apache.pinot.query.runtime.operator.OpChainStats; import org.apache.pinot.query.runtime.operator.OperatorStats; import org.apache.pinot.query.runtime.operator.utils.OperatorUtils; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; @@ -175,12 +176,16 @@ public class QueryDispatcher { public static ResultTable runReducer(long requestId, QueryPlan queryPlan, int reduceStageId, long timeoutMs, MailboxService<TransferableBlock> mailboxService, Map<Integer, ExecutionStatsAggregator> statsAggregatorMap) { MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(reduceStageId); + VirtualServerAddress server = + new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getMailboxPort(), 0); + OpChainExecutionContext context = + new OpChainExecutionContext(mailboxService, requestId, reduceStageId, server, timeoutMs, timeoutMs, + queryPlan.getStageMetadataMap()); MailboxReceiveOperator mailboxReceiveOperator = - createReduceStageOperator(mailboxService, queryPlan.getStageMetadataMap(), requestId, - reduceNode.getSenderStageId(), reduceStageId, reduceNode.getDataSchema(), - new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getMailboxPort(), 0), timeoutMs); + createReduceStageOperator( + reduceNode.getSenderStageId(), reduceStageId, reduceNode.getDataSchema(), context); List<DataBlock> resultDataBlocks = - reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, statsAggregatorMap, queryPlan); + reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, statsAggregatorMap, queryPlan, context.getStats()); return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(), queryPlan.getQueryStageMap().get(0).getDataSchema()); } @@ -193,7 +198,8 @@ public class QueryDispatcher { } private static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator, long timeoutMs, - @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap, QueryPlan queryPlan) { + @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap, QueryPlan queryPlan, + OpChainStats stats) { List<DataBlock> resultDataBlocks = new ArrayList<>(); TransferableBlock transferableBlock; long timeoutWatermark = System.nanoTime() + timeoutMs * 1_000_000L; @@ -209,7 +215,7 @@ public class QueryDispatcher { continue; } else if (transferableBlock.isEndOfStreamBlock()) { if (executionStatsAggregatorMap != null) { - for (Map.Entry<String, OperatorStats> entry : transferableBlock.getResultMetadata().entrySet()) { + for (Map.Entry<String, OperatorStats> entry : stats.getOperatorStatsMap().entrySet()) { LOGGER.info("Broker Query Execution Stats - OperatorId: {}, OperatorStats: {}", entry.getKey(), OperatorUtils.operatorStatsToJson(entry.getValue())); OperatorStats operatorStats = entry.getValue(); @@ -276,11 +282,8 @@ public class QueryDispatcher { return new DataSchema(colNames, colTypes); } - private static MailboxReceiveOperator createReduceStageOperator(MailboxService<TransferableBlock> mailboxService, - Map<Integer, StageMetadata> stageMetadataMap, long jobId, int stageId, int reducerStageId, DataSchema dataSchema, - VirtualServerAddress server, long timeoutMs) { - OpChainExecutionContext context = - new OpChainExecutionContext(mailboxService, jobId, stageId, server, timeoutMs, timeoutMs, stageMetadataMap); + private static MailboxReceiveOperator createReduceStageOperator(int stageId, int reducerStageId, + DataSchema dataSchema, OpChainExecutionContext context) { // timeout is set for reduce stage MailboxReceiveOperator mailboxReceiveOperator = new MailboxReceiveOperator(context, RelDistribution.Type.RANDOM_DISTRIBUTED, Collections.emptyList(), diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java index 345b4ca51f..a30adf04f4 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java @@ -24,9 +24,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.MultiStageOperator; import org.apache.pinot.query.runtime.operator.OpChain; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.mockito.stubbing.Answer; @@ -71,7 +73,9 @@ public class OpChainSchedulerServiceTest { } private OpChain getChain(MultiStageOperator operator) { - return new OpChain(operator, ImmutableList.of(), 1, 123, 1); + VirtualServerAddress address = new VirtualServerAddress("localhost", 1234, 1); + OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, 0, null); + return new OpChain(context, operator, ImmutableList.of()); } @Test diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java index 88974b1cb3..34b39a697b 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java @@ -22,8 +22,10 @@ import com.google.common.collect.ImmutableList; import java.util.concurrent.TimeUnit; import org.apache.pinot.query.mailbox.JsonMailboxIdentifier; import org.apache.pinot.query.mailbox.MailboxIdentifier; +import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.operator.MultiStageOperator; import org.apache.pinot.query.runtime.operator.OpChain; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.Assert; @@ -76,8 +78,9 @@ public class RoundRobinSchedulerTest { @Test public void testSchedulerHappyPath() throws InterruptedException { - OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), DEFAULT_VIRTUAL_SERVER_ID, - 123, DEFAULT_RECEIVER_STAGE_ID); + OpChain chain = + new OpChain(getOpChainExecutionContext(DEFAULT_RECEIVER_STAGE_ID, 123, DEFAULT_VIRTUAL_SERVER_ID), _operator, + ImmutableList.of(MAILBOX_1)); _scheduler = new RoundRobinScheduler(DEFAULT_RELEASE_TIMEOUT_MS); _scheduler.register(chain); @@ -102,8 +105,9 @@ public class RoundRobinSchedulerTest { @Test public void testSchedulerWhenSenderDies() throws InterruptedException { - OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), DEFAULT_VIRTUAL_SERVER_ID, - DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID); + OpChain chain = new OpChain( + getOpChainExecutionContext(DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID, DEFAULT_VIRTUAL_SERVER_ID), _operator, + ImmutableList.of(MAILBOX_1)); _scheduler = new RoundRobinScheduler(DEFAULT_RELEASE_TIMEOUT_MS); _scheduler.register(chain); @@ -132,12 +136,17 @@ public class RoundRobinSchedulerTest { // When parallelism is > 1, multiple OpChains with the same requestId and stageId would be registered in the same // scheduler. Data received on a given mailbox should wake up exactly 1 OpChain corresponding to the virtual // server-id determined by the Mailbox. - OpChain chain1 = new OpChain(_operator, ImmutableList.of(MAILBOX_1), MAILBOX_1.getToHost().virtualId(), - DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID); - OpChain chain2 = new OpChain(_operator, ImmutableList.of(MAILBOX_2), MAILBOX_2.getToHost().virtualId(), - DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID); - OpChain chain3 = new OpChain(_operator, ImmutableList.of(MAILBOX_3), MAILBOX_3.getToHost().virtualId(), - DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID); + OpChain chain1 = new OpChain( + getOpChainExecutionContext(DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID, MAILBOX_1.getToHost().virtualId()), + _operator, ImmutableList.of(MAILBOX_1)); + OpChain chain2 = new OpChain( + getOpChainExecutionContext(DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID, MAILBOX_2.getToHost().virtualId()), + _operator, ImmutableList.of(MAILBOX_2)); + OpChain chain3 = new OpChain( + getOpChainExecutionContext(DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID, MAILBOX_3.getToHost().virtualId()), + _operator, ImmutableList.of(MAILBOX_3)); + + // Register 3 OpChains. Keep release timeout high to avoid unintended OpChain wake-ups. _scheduler = new RoundRobinScheduler(10_000); _scheduler.register(chain1); @@ -172,4 +181,9 @@ public class RoundRobinSchedulerTest { Assert.assertEquals(0, _scheduler.aliveChainsSize() + _scheduler.readySize() + _scheduler.seenMailSize() + _scheduler.availableSize()); } + + private OpChainExecutionContext getOpChainExecutionContext(long requestId, int stageId, int virtualServerId) { + return new OpChainExecutionContext(null, requestId, stageId, + new VirtualServerAddress("localhost", 1234, virtualServerId), 0, 0, null); + } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java index 72d5ba66b8..25b307b22b 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java @@ -169,7 +169,7 @@ public class MailboxSendOperatorTest { // Then: Assert.assertTrue(block.isEndOfStreamBlock(), "expected EOS block to propagate"); - Mockito.verify(_exchange).send(eosBlock); + Assert.assertEquals(block, eosBlock); } @Test diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java new file mode 100644 index 0000000000..6f83f9ac77 --- /dev/null +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.operator; + +import java.util.ArrayList; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class OpChainTest { + private AutoCloseable _mocks; + @Mock + private MultiStageOperator _upstreamOperator; + + @BeforeMethod + public void setUp() { + _mocks = MockitoAnnotations.openMocks(this); + } + + @AfterMethod + public void tearDown() + throws Exception { + _mocks.close(); + } + + @Test + public void testExecutionTimerStats() { + Mockito.when(_upstreamOperator.nextBlock()).then(x -> { + Thread.sleep(1000); + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); + }); + + OpChain opChain = new OpChain(OperatorTestUtil.getDefaultContext(), _upstreamOperator, new ArrayList<>()); + opChain.getStats().executing(); + opChain.getRoot().nextBlock(); + opChain.getStats().queued(); + + Assert.assertTrue(opChain.getStats().getExecutionTime() >= 1000); + + Mockito.when(_upstreamOperator.nextBlock()).then(x -> { + Thread.sleep(20); + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); + }); + + opChain = new OpChain(OperatorTestUtil.getDefaultContext(), _upstreamOperator, new ArrayList<>()); + opChain.getStats().executing(); + opChain.getRoot().nextBlock(); + opChain.getStats().queued(); + + Assert.assertTrue(opChain.getStats().getExecutionTime() >= 20); + Assert.assertTrue(opChain.getStats().getExecutionTime() < 100); + } + + @Test + public void testStatsCollection() { + OpChainExecutionContext context = OperatorTestUtil.getDefaultContext(); + DummyMultiStageOperator dummyMultiStageOperator = new DummyMultiStageOperator(context); + + OpChain opChain = new OpChain(context, dummyMultiStageOperator, new ArrayList<>()); + opChain.getStats().executing(); + opChain.getRoot().nextBlock(); + opChain.getStats().queued(); + + Assert.assertTrue(opChain.getStats().getExecutionTime() >= 1000); + Assert.assertEquals(opChain.getStats().getOperatorStatsMap().size(), 1); + Assert.assertTrue(opChain.getStats().getOperatorStatsMap().containsKey(dummyMultiStageOperator.getOperatorId())); + + Map<String, String> executionStats = + opChain.getStats().getOperatorStatsMap().get(dummyMultiStageOperator.getOperatorId()).getExecutionStats(); + Assert.assertTrue( + Long.parseLong(executionStats.get(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName())) >= 1000); + Assert.assertTrue( + Long.parseLong(executionStats.get(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName())) <= 2000); + } + + static class DummyMultiStageOperator extends MultiStageOperator { + public DummyMultiStageOperator(OpChainExecutionContext context) { + super(context); + } + + @Override + protected TransferableBlock getNextBlock() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // IGNORE + } + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); + } + + @Nullable + @Override + public String toExplainString() { + return "DUMMY"; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org