This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 5416652ff5 [multistage] [debuggability] OpChain and operator stats (#10094) 5416652ff5 is described below commit 5416652ff59b150d455c1f4764d2e0bc561d8cf3 Author: Yao Liu <y...@startree.ai> AuthorDate: Thu Jan 19 08:54:43 2023 -0800 [multistage] [debuggability] OpChain and operator stats (#10094) --- .../apache/pinot/query/runtime/QueryRunner.java | 62 +++++----- .../runtime/executor/OpChainSchedulerService.java | 10 +- .../query/runtime/operator/AggregateOperator.java | 86 +++++++------ .../query/runtime/operator/FilterOperator.java | 22 +++- .../query/runtime/operator/HashJoinOperator.java | 30 ++++- .../LeafStageTransferableBlockOperator.java | 50 +++++--- .../runtime/operator/LiteralValueOperator.java | 27 +++- .../runtime/operator/MailboxReceiveOperator.java | 99 ++++++++------- .../runtime/operator/MailboxSendOperator.java | 21 +++- .../pinot/query/runtime/operator/OpChain.java | 1 + .../pinot/query/runtime/operator/OpChainStats.java | 20 +-- .../query/runtime/operator/OperatorStats.java | 78 ++++++++++++ .../pinot/query/runtime/operator/SortOperator.java | 26 +++- .../query/runtime/operator/TransformOperator.java | 23 +++- .../query/runtime/plan/PhysicalPlanVisitor.java | 15 ++- .../pinot/query/service/QueryDispatcher.java | 12 +- .../runtime/operator/AggregateOperatorTest.java | 41 +++---- .../query/runtime/operator/FilterOperatorTest.java | 28 ++--- .../runtime/operator/HashJoinOperatorTest.java | 35 +++--- .../LeafStageTransferableBlockOperatorTest.java | 136 +++++++++++---------- .../runtime/operator/LiteralValueOperatorTest.java | 25 +++- .../runtime/operator/MailboxSendOperatorTest.java | 10 +- .../query/runtime/operator/SortOperatorTest.java | 30 ++--- .../runtime/operator/TransformOperatorTest.java | 20 +-- 24 files changed, 585 insertions(+), 322 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 41571fcd76..ad0d138bb4 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -88,24 +88,20 @@ public class QueryRunner { * Initializes the query executor. * <p>Should be called only once and before calling any other method. */ - public void init(PinotConfiguration config, InstanceDataManager instanceDataManager, - HelixManager helixManager, ServerMetrics serverMetrics) { + public void init(PinotConfiguration config, InstanceDataManager instanceDataManager, HelixManager helixManager, + ServerMetrics serverMetrics) { String instanceName = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME); _hostname = instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? instanceName.substring( CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName; _port = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT); _helixManager = helixManager; try { - long releaseMs = config.getProperty( - QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS, + long releaseMs = config.getProperty(QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS, QueryConfig.DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS); - _scheduler = new OpChainSchedulerService( - new RoundRobinScheduler(releaseMs), - Executors.newFixedThreadPool( - ResourceManager.DEFAULT_QUERY_WORKER_THREADS, - new NamedThreadFactory("query_worker_on_" + _port + "_port")), - releaseMs); + _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), + Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS, + new NamedThreadFactory("query_worker_on_" + _port + "_port")), releaseMs); _mailboxService = MultiplexingMailboxService.newInstance(_hostname, _port, config, _scheduler::onDataAvailable); _serverExecutor = new ServerQueryExecutorV1Impl(); _serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics); @@ -130,12 +126,14 @@ public class QueryRunner { } public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) { + long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)); if (isLeafStage(distributedStagePlan)) { // TODO: make server query request return via mailbox, this is a hack to gather the non-streaming data table // and package it here for return. But we should really use a MailboxSendOperator directly put into the // server executor. - List<ServerPlanRequestContext> serverQueryRequests = constructServerQueryRequests(distributedStagePlan, - requestMetadataMap, _helixPropertyStore, _mailboxService); + long leafStageStartMillis = System.currentTimeMillis(); + List<ServerPlanRequestContext> serverQueryRequests = + constructServerQueryRequests(distributedStagePlan, requestMetadataMap, _helixPropertyStore, _mailboxService); // send the data table via mailbox in one-off fashion (e.g. no block-level split, one data table/partition key) List<InstanceResponseBlock> serverQueryResults = new ArrayList<>(serverQueryRequests.size()); @@ -144,25 +142,27 @@ public class QueryRunner { new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis()); serverQueryResults.add(processServerQuery(request, _scheduler.getWorkerPool())); } - + LOGGER.debug( + "RequestId:" + requestId + " StageId:" + distributedStagePlan.getStageId() + " Leaf stage v1 processing time:" + + (System.currentTimeMillis() - leafStageStartMillis) + " ms"); MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot(); StageMetadata receivingStageMetadata = distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId()); - MailboxSendOperator mailboxSendOperator = - new MailboxSendOperator(_mailboxService, - new LeafStageTransferableBlockOperator(serverQueryResults, sendNode.getDataSchema()), - receivingStageMetadata.getServerInstances(), sendNode.getExchangeType(), - sendNode.getPartitionKeySelector(), _hostname, _port, serverQueryRequests.get(0).getRequestId(), - sendNode.getStageId()); + MailboxSendOperator mailboxSendOperator = new MailboxSendOperator(_mailboxService, + new LeafStageTransferableBlockOperator(serverQueryResults, sendNode.getDataSchema(), requestId, + sendNode.getStageId()), receivingStageMetadata.getServerInstances(), sendNode.getExchangeType(), + sendNode.getPartitionKeySelector(), _hostname, _port, serverQueryRequests.get(0).getRequestId(), + sendNode.getStageId()); int blockCounter = 0; while (!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) { LOGGER.debug("Acquired transferable block: {}", blockCounter++); } + mailboxSendOperator.toExplainString(); } else { - long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)); long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS)); StageNode stageRoot = distributedStagePlan.getStageRoot(); - OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot, new PlanRequestContext(_mailboxService, requestId, - stageRoot.getStageId(), timeoutMs, _hostname, _port, distributedStagePlan.getMetadataMap())); + OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot, + new PlanRequestContext(_mailboxService, requestId, stageRoot.getStageId(), timeoutMs, _hostname, _port, + distributedStagePlan.getMetadataMap())); _scheduler.register(rootOperator); } } @@ -174,8 +174,8 @@ public class QueryRunner { Preconditions.checkState(stageMetadata.getScannedTables().size() == 1, "Server request for V2 engine should only have 1 scan table per request."); String rawTableName = stageMetadata.getScannedTables().get(0); - Map<String, List<String>> tableToSegmentListMap = stageMetadata.getServerInstanceToSegmentsMap() - .get(distributedStagePlan.getServerInstance()); + Map<String, List<String>> tableToSegmentListMap = + stageMetadata.getServerInstanceToSegmentsMap().get(distributedStagePlan.getServerInstance()); List<ServerPlanRequestContext> requests = new ArrayList<>(); for (Map.Entry<String, List<String>> tableEntry : tableToSegmentListMap.entrySet()) { String tableType = tableEntry.getKey(); @@ -187,15 +187,17 @@ public class QueryRunner { TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName)); Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore, TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName)); - requests.add(ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, requestMetadataMap, - tableConfig, schema, stageMetadata.getTimeBoundaryInfo(), TableType.OFFLINE, tableEntry.getValue())); + requests.add( + ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, requestMetadataMap, tableConfig, + schema, stageMetadata.getTimeBoundaryInfo(), TableType.OFFLINE, tableEntry.getValue())); } else if (TableType.REALTIME.name().equals(tableType)) { TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore, TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName)); Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore, TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName)); - requests.add(ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, requestMetadataMap, - tableConfig, schema, stageMetadata.getTimeBoundaryInfo(), TableType.REALTIME, tableEntry.getValue())); + requests.add( + ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, requestMetadataMap, tableConfig, + schema, stageMetadata.getTimeBoundaryInfo(), TableType.REALTIME, tableEntry.getValue())); } else { throw new IllegalArgumentException("Unsupported table type key: " + tableType); } @@ -209,8 +211,8 @@ public class QueryRunner { return _serverExecutor.execute(serverQueryRequest, executorService); } catch (Exception e) { InstanceResponseBlock errorResponse = new InstanceResponseBlock(); - errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE, - e.getMessage() + QueryException.getTruncatedStackTrace(e)); + errorResponse.getExceptions() + .put(QueryException.QUERY_EXECUTION_ERROR_CODE, e.getMessage() + QueryException.getTruncatedStackTrace(e)); return errorResponse; } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java index 21a944d4f7..6c463eeeed 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java @@ -114,15 +114,18 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService { register(operatorChain, false); } else { if (result.isErrorBlock()) { + operatorChain.getRoot().toExplainString(); LOGGER.error("({}): Completed erroneously {} {}", operatorChain, operatorChain.getStats(), result.getDataBlock().getExceptions()); } else { + operatorChain.getRoot().toExplainString(); LOGGER.debug("({}): Completed {}", operatorChain, operatorChain.getStats()); } operatorChain.close(); } } catch (Exception e) { operatorChain.close(); + operatorChain.getRoot().toExplainString(); LOGGER.error("({}): Failed to execute operator chain! {}", operatorChain, operatorChain.getStats(), e); } } @@ -154,11 +157,6 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService { LOGGER.debug("({}): Scheduler is now handling operator chain listening to mailboxes {}. " + "There are a total of {} chains awaiting execution.", operatorChain, operatorChain.getReceivingMailbox(), _scheduler.size()); - - // we want to track the time that it takes from registering - // an operator chain to when it completes, so make sure to - // start the timer here - operatorChain.getStats().startExecutionTimer(); } public final void register(OpChain operatorChain, boolean isNew) { @@ -167,8 +165,8 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService { LOGGER.trace("({}): Registered operator chain (new: {}). Total: {}", operatorChain, isNew, _scheduler.size()); _scheduler.register(operatorChain, isNew); - operatorChain.getStats().queued(); } finally { + operatorChain.getStats().queued(); _monitor.leave(); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java index 3182dba91f..199d8c8b96 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java @@ -37,6 +37,8 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.segment.local.customobject.PinotFourthMoment; import org.apache.pinot.spi.data.FieldSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -55,8 +57,10 @@ import org.apache.pinot.spi.data.FieldSpec; */ public class AggregateOperator extends MultiStageOperator { private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR"; + private static final Logger LOGGER = LoggerFactory.getLogger(AggregateOperator.class); private final MultiStageOperator _inputOperator; + // TODO: Deal with the case where _aggCalls is empty but we have groupSet setup, which means this is a Distinct call. private final List<RexExpression.FunctionCall> _aggCalls; private final List<RexExpression> _groupSet; @@ -69,27 +73,29 @@ public class AggregateOperator extends MultiStageOperator { private boolean _readyToConstruct; private boolean _hasReturnedAggregateBlock; + // TODO: Move to OperatorContext class. + private OperatorStats _operatorStats; + // TODO: refactor Pinot Reducer code to support the intermediate stage agg operator. // aggCalls has to be a list of FunctionCall and cannot be null // groupSet has to be a list of InputRef and cannot be null // TODO: Add these two checks when we confirm we can handle error in upstream ctor call. - public AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema, - List<RexExpression> aggCalls, List<RexExpression> groupSet, DataSchema inputSchema) { - this(inputOperator, dataSchema, aggCalls, groupSet, inputSchema, AggregateOperator.Accumulator.MERGERS); + public AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema, List<RexExpression> aggCalls, + List<RexExpression> groupSet, DataSchema inputSchema, long requestId, int stageId) { + this(inputOperator, dataSchema, aggCalls, groupSet, inputSchema, AggregateOperator.Accumulator.MERGERS, requestId, + stageId); } @VisibleForTesting - AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema, - List<RexExpression> aggCalls, List<RexExpression> groupSet, DataSchema inputSchema, Map<String, - Function<DataSchema.ColumnDataType, Merger>> mergers) { + AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema, List<RexExpression> aggCalls, + List<RexExpression> groupSet, DataSchema inputSchema, + Map<String, Function<DataSchema.ColumnDataType, Merger>> mergers, long requestId, int stageId) { _inputOperator = inputOperator; _groupSet = groupSet; _upstreamErrorBlock = null; // we expect all agg calls to be aggregate function calls - _aggCalls = aggCalls.stream() - .map(RexExpression.FunctionCall.class::cast) - .collect(Collectors.toList()); + _aggCalls = aggCalls.stream().map(RexExpression.FunctionCall.class::cast).collect(Collectors.toList()); _accumulators = new Accumulator[_aggCalls.size()]; for (int i = 0; i < _aggCalls.size(); i++) { @@ -105,6 +111,7 @@ public class AggregateOperator extends MultiStageOperator { _resultSchema = dataSchema; _readyToConstruct = false; _hasReturnedAggregateBlock = false; + _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME); } @Override @@ -115,11 +122,15 @@ public class AggregateOperator extends MultiStageOperator { @Nullable @Override public String toExplainString() { + // TODO: move to close call; + _inputOperator.toExplainString(); + LOGGER.debug(_operatorStats.toString()); return EXPLAIN_NAME; } @Override protected TransferableBlock getNextBlock() { + _operatorStats.startTimer(); try { if (!_readyToConstruct && !consumeInputBlocks()) { return TransferableBlockUtils.getNoOpTransferableBlock(); @@ -132,10 +143,13 @@ public class AggregateOperator extends MultiStageOperator { if (!_hasReturnedAggregateBlock) { return produceAggregatedBlock(); } else { + // TODO: Move to close call. return TransferableBlockUtils.getEndOfStreamTransferableBlock(); } } catch (Exception e) { return TransferableBlockUtils.getErrorTransferableBlock(e); + } finally { + _operatorStats.endTimer(); } } @@ -154,6 +168,7 @@ public class AggregateOperator extends MultiStageOperator { if (rows.size() == 0) { return TransferableBlockUtils.getEndOfStreamTransferableBlock(); } else { + _operatorStats.recordOutput(1, rows.size()); return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW); } } @@ -162,7 +177,9 @@ public class AggregateOperator extends MultiStageOperator { * @return whether or not the operator is ready to move on (EOS or ERROR) */ private boolean consumeInputBlocks() { + _operatorStats.endTimer(); TransferableBlock block = _inputOperator.nextBlock(); + _operatorStats.startTimer(); while (!block.isNoOpBlock()) { // setting upstream error block if (block.isErrorBlock()) { @@ -181,7 +198,10 @@ public class AggregateOperator extends MultiStageOperator { _accumulators[i].accumulate(key, row); } } + _operatorStats.recordInput(1, container.size()); + _operatorStats.endTimer(); block = _inputOperator.nextBlock(); + _operatorStats.startTimer(); } return false; } @@ -269,32 +289,25 @@ public class AggregateOperator extends MultiStageOperator { } private static class Accumulator { - - private static final Map<String, Function<DataSchema.ColumnDataType, Merger>> MERGERS = ImmutableMap - .<String, Function<DataSchema.ColumnDataType, Merger>>builder() - .put("SUM", cdt -> AggregateOperator::mergeSum) - .put("$SUM", cdt -> AggregateOperator::mergeSum) - .put("$SUM0", cdt -> AggregateOperator::mergeSum) - .put("MIN", cdt -> AggregateOperator::mergeMin) - .put("$MIN", cdt -> AggregateOperator::mergeMin) - .put("$MIN0", cdt -> AggregateOperator::mergeMin) - .put("MAX", cdt -> AggregateOperator::mergeMax) - .put("$MAX", cdt -> AggregateOperator::mergeMax) - .put("$MAX0", cdt -> AggregateOperator::mergeMax) - .put("COUNT", cdt -> AggregateOperator::mergeCount) - .put("BOOL_AND", cdt -> AggregateOperator::mergeBoolAnd) - .put("$BOOL_AND", cdt -> AggregateOperator::mergeBoolAnd) - .put("$BOOL_AND0", cdt -> AggregateOperator::mergeBoolAnd) - .put("BOOL_OR", cdt -> AggregateOperator::mergeBoolOr) - .put("$BOOL_OR", cdt -> AggregateOperator::mergeBoolOr) - .put("$BOOL_OR0", cdt -> AggregateOperator::mergeBoolOr) - .put("FOURTHMOMENT", cdt -> cdt == DataSchema.ColumnDataType.OBJECT - ? new MergeFourthMomentObject() : new MergeFourthMomentNumeric()) - .put("$FOURTHMOMENT", cdt -> cdt == DataSchema.ColumnDataType.OBJECT - ? new MergeFourthMomentObject() : new MergeFourthMomentNumeric()) - .put("$FOURTHMOMENT0", cdt -> cdt == DataSchema.ColumnDataType.OBJECT - ? new MergeFourthMomentObject() : new MergeFourthMomentNumeric()) - .build(); + private static final Map<String, Function<DataSchema.ColumnDataType, Merger>> MERGERS = + ImmutableMap.<String, Function<DataSchema.ColumnDataType, Merger>>builder() + .put("SUM", cdt -> AggregateOperator::mergeSum).put("$SUM", cdt -> AggregateOperator::mergeSum) + .put("$SUM0", cdt -> AggregateOperator::mergeSum).put("MIN", cdt -> AggregateOperator::mergeMin) + .put("$MIN", cdt -> AggregateOperator::mergeMin).put("$MIN0", cdt -> AggregateOperator::mergeMin) + .put("MAX", cdt -> AggregateOperator::mergeMax).put("$MAX", cdt -> AggregateOperator::mergeMax) + .put("$MAX0", cdt -> AggregateOperator::mergeMax).put("COUNT", cdt -> AggregateOperator::mergeCount) + .put("BOOL_AND", cdt -> AggregateOperator::mergeBoolAnd) + .put("$BOOL_AND", cdt -> AggregateOperator::mergeBoolAnd) + .put("$BOOL_AND0", cdt -> AggregateOperator::mergeBoolAnd) + .put("BOOL_OR", cdt -> AggregateOperator::mergeBoolOr) + .put("$BOOL_OR", cdt -> AggregateOperator::mergeBoolOr) + .put("$BOOL_OR0", cdt -> AggregateOperator::mergeBoolOr).put("FOURTHMOMENT", + cdt -> cdt == DataSchema.ColumnDataType.OBJECT ? new MergeFourthMomentObject() + : new MergeFourthMomentNumeric()).put("$FOURTHMOMENT", + cdt -> cdt == DataSchema.ColumnDataType.OBJECT ? new MergeFourthMomentObject() + : new MergeFourthMomentNumeric()).put("$FOURTHMOMENT0", + cdt -> cdt == DataSchema.ColumnDataType.OBJECT ? new MergeFourthMomentObject() + : new MergeFourthMomentNumeric()).build(); final int _inputRef; final Object _literal; @@ -336,8 +349,7 @@ public class AggregateOperator extends MultiStageOperator { private RexExpression toAggregationFunctionOperand(RexExpression.FunctionCall rexExpression) { List<RexExpression> functionOperands = rexExpression.getFunctionOperands(); Preconditions.checkState(functionOperands.size() < 2, "aggregate functions cannot have more than one operand"); - return functionOperands.size() > 0 - ? functionOperands.get(0) + return functionOperands.size() > 0 ? functionOperands.get(0) : new RexExpression.Literal(FieldSpec.DataType.INT, 1); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java index 3ae9eac98f..6f57ece6df 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java @@ -29,6 +29,8 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.operands.TransformOperand; import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /* @@ -47,15 +49,21 @@ import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils; public class FilterOperator extends MultiStageOperator { private static final String EXPLAIN_NAME = "FILTER"; private final MultiStageOperator _upstreamOperator; + private static final Logger LOGGER = LoggerFactory.getLogger(AggregateOperator.class); private final TransformOperand _filterOperand; private final DataSchema _dataSchema; private TransferableBlock _upstreamErrorBlock; - public FilterOperator(MultiStageOperator upstreamOperator, DataSchema dataSchema, RexExpression filter) { + // TODO: Move to OperatorContext class. + private OperatorStats _operatorStats; + + public FilterOperator(MultiStageOperator upstreamOperator, DataSchema dataSchema, RexExpression filter, + long requestId, int stageId) { _upstreamOperator = upstreamOperator; _dataSchema = dataSchema; _filterOperand = TransformOperand.toTransformOperand(filter, dataSchema); _upstreamErrorBlock = null; + _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME); } @Override @@ -66,15 +74,23 @@ public class FilterOperator extends MultiStageOperator { @Nullable @Override public String toExplainString() { + _upstreamOperator.toExplainString(); + LOGGER.debug(_operatorStats.toString()); return EXPLAIN_NAME; } @Override protected TransferableBlock getNextBlock() { + _operatorStats.startTimer(); try { - return transform(_upstreamOperator.nextBlock()); + _operatorStats.endTimer(); + TransferableBlock block = _upstreamOperator.nextBlock(); + _operatorStats.startTimer(); + return transform(block); } catch (Exception e) { return TransferableBlockUtils.getErrorTransferableBlock(e); + } finally { + _operatorStats.endTimer(); } } @@ -97,6 +113,8 @@ public class FilterOperator extends MultiStageOperator { resultRows.add(row); } } + _operatorStats.recordInput(1, container.size()); + _operatorStats.recordOutput(1, resultRows.size()); return new TransferableBlock(resultRows, _dataSchema, DataBlock.Type.ROW); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java index be47c9f389..b4e88965cb 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java @@ -39,6 +39,9 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.operands.TransformOperand; import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * This basic {@code BroadcastJoinOperator} implement a basic broadcast join algorithm. @@ -55,6 +58,8 @@ import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils; // TODO: Move inequi out of hashjoin. (https://github.com/apache/pinot/issues/9728) public class HashJoinOperator extends MultiStageOperator { private static final String EXPLAIN_NAME = "HASH_JOIN"; + private static final Logger LOGGER = LoggerFactory.getLogger(AggregateOperator.class); + private static final Set<JoinRelType> SUPPORTED_JOIN_TYPES = ImmutableSet.of(JoinRelType.INNER, JoinRelType.LEFT, JoinRelType.RIGHT, JoinRelType.FULL); @@ -82,8 +87,10 @@ public class HashJoinOperator extends MultiStageOperator { private KeySelector<Object[], Object[]> _leftKeySelector; private KeySelector<Object[], Object[]> _rightKeySelector; + private OperatorStats _operatorStats; + public HashJoinOperator(MultiStageOperator leftTableOperator, MultiStageOperator rightTableOperator, - DataSchema leftSchema, JoinNode node) { + DataSchema leftSchema, JoinNode node, long requestId, int stageId) { Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(node.getJoinRelType()), "Join type: " + node.getJoinRelType() + " is not supported!"); _joinType = node.getJoinRelType(); @@ -111,6 +118,7 @@ public class HashJoinOperator extends MultiStageOperator { _matchedRightRows = null; } _upstreamErrorBlock = null; + _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME); } // TODO: Separate left and right table operator. @@ -122,11 +130,15 @@ public class HashJoinOperator extends MultiStageOperator { @Nullable @Override public String toExplainString() { + _leftTableOperator.toExplainString(); + _rightTableOperator.toExplainString(); + LOGGER.debug(_operatorStats.toString()); return EXPLAIN_NAME; } @Override protected TransferableBlock getNextBlock() { + _operatorStats.startTimer(); try { if (_isTerminated) { return TransferableBlockUtils.getEndOfStreamTransferableBlock(); @@ -140,15 +152,22 @@ public class HashJoinOperator extends MultiStageOperator { } else if (!_isHashTableBuilt) { return TransferableBlockUtils.getNoOpTransferableBlock(); } + _operatorStats.endTimer(); + TransferableBlock leftBlock = _leftTableOperator.nextBlock(); + _operatorStats.startTimer(); // JOIN each left block with the right block. - return buildJoinedDataBlock(_leftTableOperator.nextBlock()); + return buildJoinedDataBlock(leftBlock); } catch (Exception e) { return TransferableBlockUtils.getErrorTransferableBlock(e); + } finally { + _operatorStats.endTimer(); } } private void buildBroadcastHashTable() { + _operatorStats.endTimer(); TransferableBlock rightBlock = _rightTableOperator.nextBlock(); + _operatorStats.startTimer(); while (!rightBlock.isNoOpBlock()) { if (rightBlock.isErrorBlock()) { _upstreamErrorBlock = rightBlock; @@ -165,8 +184,10 @@ public class HashJoinOperator extends MultiStageOperator { _broadcastRightTable.computeIfAbsent(new Key(_rightKeySelector.getKey(row)), k -> new ArrayList<>()); hashCollection.add(row); } - + _operatorStats.recordInput(1, container.size()); + _operatorStats.endTimer(); rightBlock = _rightTableOperator.nextBlock(); + _operatorStats.startTimer(); } } @@ -196,6 +217,7 @@ public class HashJoinOperator extends MultiStageOperator { } } _isTerminated = true; + _operatorStats.recordOutput(1, returnRows.size()); return new TransferableBlock(returnRows, _resultSchema, DataBlock.Type.ROW); } List<Object[]> rows = new ArrayList<>(); @@ -230,6 +252,8 @@ public class HashJoinOperator extends MultiStageOperator { rows.add(joinRow(leftRow, null)); } } + _operatorStats.recordInput(1, container.size()); + _operatorStats.recordOutput(1, rows.size()); return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java index e794a84194..baf7373a07 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java @@ -40,6 +40,8 @@ import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; import org.apache.pinot.core.query.selection.SelectionOperatorUtils; import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -57,17 +59,23 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock; */ public class LeafStageTransferableBlockOperator extends MultiStageOperator { private static final String EXPLAIN_NAME = "LEAF_STAGE_TRANSFER_OPERATOR"; + private static final Logger LOGGER = LoggerFactory.getLogger(LeafStageTransferableBlockOperator.class); private final InstanceResponseBlock _errorBlock; private final List<InstanceResponseBlock> _baseResultBlock; private final DataSchema _desiredDataSchema; private int _currentIndex; - public LeafStageTransferableBlockOperator(List<InstanceResponseBlock> baseResultBlock, DataSchema dataSchema) { + // TODO: Move to OperatorContext class. + private OperatorStats _operatorStats; + + public LeafStageTransferableBlockOperator(List<InstanceResponseBlock> baseResultBlock, DataSchema dataSchema, + long requestId, int stageId) { _baseResultBlock = baseResultBlock; _desiredDataSchema = dataSchema; _errorBlock = baseResultBlock.stream().filter(e -> !e.getExceptions().isEmpty()).findFirst().orElse(null); _currentIndex = 0; + _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME); } @Override @@ -78,29 +86,39 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator { @Nullable @Override public String toExplainString() { + LOGGER.debug(_operatorStats.toString()); return EXPLAIN_NAME; } @Override protected TransferableBlock getNextBlock() { - if (_currentIndex < 0) { - throw new RuntimeException("Leaf transfer terminated. next block should no longer be called."); - } - if (_errorBlock != null) { - _currentIndex = -1; - return new TransferableBlock(DataBlockUtils.getErrorDataBlock(_errorBlock.getExceptions())); - } else { - if (_currentIndex < _baseResultBlock.size()) { - InstanceResponseBlock responseBlock = _baseResultBlock.get(_currentIndex++); - if (responseBlock.getResultsBlock() != null && responseBlock.getResultsBlock().getNumRows() > 0) { - return composeTransferableBlock(responseBlock, _desiredDataSchema); + try { + _operatorStats.startTimer(); + if (_currentIndex < 0) { + throw new RuntimeException("Leaf transfer terminated. next block should no longer be called."); + } + if (_errorBlock != null) { + _currentIndex = -1; + return new TransferableBlock(DataBlockUtils.getErrorDataBlock(_errorBlock.getExceptions())); + } else { + if (_currentIndex < _baseResultBlock.size()) { + InstanceResponseBlock responseBlock = _baseResultBlock.get(_currentIndex++); + if (responseBlock.getResultsBlock() != null && responseBlock.getResultsBlock().getNumRows() > 0) { + _operatorStats.recordInput(1, responseBlock.getResultsBlock().getNumRows()); + _operatorStats.recordOutput(1, responseBlock.getResultsBlock().getNumRows()); + return composeTransferableBlock(responseBlock, _desiredDataSchema); + } else { + _operatorStats.recordInput(1, responseBlock.getResultsBlock().getNumRows()); + _operatorStats.recordOutput(1, responseBlock.getResultsBlock().getNumRows()); + return new TransferableBlock(Collections.emptyList(), _desiredDataSchema, DataBlock.Type.ROW); + } } else { - return new TransferableBlock(Collections.emptyList(), _desiredDataSchema, DataBlock.Type.ROW); + _currentIndex = -1; + return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock()); } - } else { - _currentIndex = -1; - return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock()); } + } finally { + _operatorStats.endTimer(); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java index cace9fa974..8fc160f205 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java @@ -27,17 +27,24 @@ import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LiteralValueOperator extends MultiStageOperator { private static final String EXPLAIN_NAME = "LITERAL_VALUE_PROVIDER"; + private static final Logger LOGGER = LoggerFactory.getLogger(LiteralValueOperator.class); private final DataSchema _dataSchema; private final TransferableBlock _rexLiteralBlock; private boolean _isLiteralBlockReturned; - public LiteralValueOperator(DataSchema dataSchema, List<List<RexExpression>> rexLiteralRows) { + private OperatorStats _operatorStats; + + public LiteralValueOperator(DataSchema dataSchema, List<List<RexExpression>> rexLiteralRows, + long requestId, int stageId) { _dataSchema = dataSchema; + _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME); _rexLiteralBlock = constructBlock(rexLiteralRows); _isLiteralBlockReturned = false; } @@ -50,16 +57,22 @@ public class LiteralValueOperator extends MultiStageOperator { @Nullable @Override public String toExplainString() { + LOGGER.debug(_operatorStats.toString()); return EXPLAIN_NAME; } @Override protected TransferableBlock getNextBlock() { - if (!_isLiteralBlockReturned) { - _isLiteralBlockReturned = true; - return _rexLiteralBlock; - } else { - return TransferableBlockUtils.getEndOfStreamTransferableBlock(); + try { + _operatorStats.startTimer(); + if (!_isLiteralBlockReturned) { + _isLiteralBlockReturned = true; + return _rexLiteralBlock; + } else { + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); + } + } finally { + _operatorStats.endTimer(); } } @@ -72,6 +85,8 @@ public class LiteralValueOperator extends MultiStageOperator { } blockContent.add(row); } + _operatorStats.recordInput(1, blockContent.size()); + _operatorStats.recordOutput(1, blockContent.size()); return new TransferableBlock(blockContent, _dataSchema, DataBlock.Type.ROW); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java index ee97a99e79..aa569ba56f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java @@ -65,6 +65,7 @@ public class MailboxReceiveOperator extends MultiStageOperator { private final long _deadlineTimestampNano; private int _serverIdx; private TransferableBlock _upstreamErrorBlock; + private OperatorStats _operatorStats; private static MailboxIdentifier toMailboxId(ServerInstance fromInstance, long jobId, long stageId, String receiveHostName, int receivePort) { @@ -109,6 +110,7 @@ public class MailboxReceiveOperator extends MultiStageOperator { } _upstreamErrorBlock = null; _serverIdx = 0; + _operatorStats = new OperatorStats(jobId, stageId, EXPLAIN_NAME); } public List<MailboxIdentifier> getSendingMailbox() { @@ -123,61 +125,68 @@ public class MailboxReceiveOperator extends MultiStageOperator { @Nullable @Override public String toExplainString() { + LOGGER.debug(_operatorStats.toString()); return EXPLAIN_NAME; } @Override protected TransferableBlock getNextBlock() { - if (_upstreamErrorBlock != null) { - return _upstreamErrorBlock; - } else if (System.nanoTime() >= _deadlineTimestampNano) { - LOGGER.error("Timed out after polling mailboxes: {}", _sendingMailbox); - return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR); - } + try { + _operatorStats.startTimer(); + if (_upstreamErrorBlock != null) { + return _upstreamErrorBlock; + } else if (System.nanoTime() >= _deadlineTimestampNano) { + return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR); + } - int startingIdx = _serverIdx; - int openMailboxCount = 0; - int eosMailboxCount = 0; - - // For all non-singleton distribution, we poll from every instance to check mailbox content. - // TODO: Fix wasted CPU cycles on waiting for servers that are not supposed to give content. - for (int i = 0; i < _sendingMailbox.size(); i++) { - // this implements a round-robin mailbox iterator, so we don't starve any mailboxes - _serverIdx = (startingIdx + i) % _sendingMailbox.size(); - MailboxIdentifier mailboxId = _sendingMailbox.get(_serverIdx); - try { - ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(mailboxId); - if (!mailbox.isClosed()) { - openMailboxCount++; - TransferableBlock block = mailbox.receive(); - - // Get null block when pulling times out from mailbox. - if (block != null) { - if (block.isErrorBlock()) { - _upstreamErrorBlock = - TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions()); - return _upstreamErrorBlock; - } - if (!block.isEndOfStreamBlock()) { - return block; - } else { - eosMailboxCount++; + int startingIdx = _serverIdx; + int openMailboxCount = 0; + int eosMailboxCount = 0; + + // For all non-singleton distribution, we poll from every instance to check mailbox content. + // TODO: Fix wasted CPU cycles on waiting for servers that are not supposed to give content. + for (int i = 0; i < _sendingMailbox.size(); i++) { + // this implements a round-robin mailbox iterator, so we don't starve any mailboxes + _serverIdx = (startingIdx + i) % _sendingMailbox.size(); + MailboxIdentifier mailboxId = _sendingMailbox.get(_serverIdx); + try { + ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(mailboxId); + if (!mailbox.isClosed()) { + openMailboxCount++; + TransferableBlock block = mailbox.receive(); + // Get null block when pulling times out from mailbox. + if (block != null) { + if (block.isErrorBlock()) { + _upstreamErrorBlock = + TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions()); + return _upstreamErrorBlock; + } + if (!block.isEndOfStreamBlock()) { + _operatorStats.recordInput(1, block.getNumRows()); + _operatorStats.recordOutput(1, block.getNumRows()); + return block; + } else { + eosMailboxCount++; + } } } + } catch (Exception e) { + return TransferableBlockUtils.getErrorTransferableBlock( + new RuntimeException(String.format("Error polling mailbox=%s", mailboxId), e)); } - } catch (Exception e) { - return TransferableBlockUtils.getErrorTransferableBlock( - new RuntimeException(String.format("Error polling mailbox=%s", mailboxId), e)); } - } - // there are two conditions in which we should return EOS: (1) there were - // no mailboxes to open (this shouldn't happen because the second condition - // should be hit first, but is defensive) (2) every mailbox that was opened - // returned an EOS block. in every other scenario, there are mailboxes that - // are not yet exhausted and we should wait for more data to be available - return openMailboxCount > 0 && openMailboxCount > eosMailboxCount - ? TransferableBlockUtils.getNoOpTransferableBlock() - : TransferableBlockUtils.getEndOfStreamTransferableBlock(); + // there are two conditions in which we should return EOS: (1) there were + // no mailboxes to open (this shouldn't happen because the second condition + // should be hit first, but is defensive) (2) every mailbox that was opened + // returned an EOS block. in every other scenario, there are mailboxes that + // are not yet exhausted and we should wait for more data to be available + TransferableBlock block = + openMailboxCount > 0 && openMailboxCount > eosMailboxCount ? TransferableBlockUtils.getNoOpTransferableBlock() + : TransferableBlockUtils.getEndOfStreamTransferableBlock(); + return block; + } finally { + _operatorStats.endTimer(); + } } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index a6299ea60a..79d64bfefe 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -54,6 +54,7 @@ public class MailboxSendOperator extends MultiStageOperator { private final MultiStageOperator _dataTableBlockBaseOperator; private final BlockExchange _exchange; + private OperatorStats _operatorStats; @VisibleForTesting interface BlockExchangeFactory { @@ -71,14 +72,14 @@ public class MailboxSendOperator extends MultiStageOperator { RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, String hostName, int port, long jobId, int stageId) { this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances, exchangeType, keySelector, - server -> toMailboxId(server, jobId, stageId, hostName, port), BlockExchange::getExchange); + server -> toMailboxId(server, jobId, stageId, hostName, port), BlockExchange::getExchange, jobId, stageId); } @VisibleForTesting MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, MultiStageOperator dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances, RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, - MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory blockExchangeFactory) { + MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory blockExchangeFactory, long jobId, int stageId) { _dataTableBlockBaseOperator = dataTableBlockBaseOperator; List<MailboxIdentifier> receivingMailboxes; @@ -106,6 +107,7 @@ public class MailboxSendOperator extends MultiStageOperator { Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(exchangeType), String.format("Exchange type '%s' is not supported yet", exchangeType)); + _operatorStats = new OperatorStats(jobId, stageId, EXPLAIN_NAME); } @Override @@ -116,22 +118,30 @@ public class MailboxSendOperator extends MultiStageOperator { @Nullable @Override public String toExplainString() { + _dataTableBlockBaseOperator.toExplainString(); + LOGGER.debug(_operatorStats.toString()); return EXPLAIN_NAME; } @Override protected TransferableBlock getNextBlock() { + _operatorStats.startTimer(); TransferableBlock transferableBlock; try { + _operatorStats.endTimer(); transferableBlock = _dataTableBlockBaseOperator.nextBlock(); + _operatorStats.startTimer(); while (!transferableBlock.isNoOpBlock()) { _exchange.send(transferableBlock); - + _operatorStats.recordInput(1, transferableBlock.getNumRows()); + // The # of output block is not accurate because we may do a split in exchange send. + _operatorStats.recordOutput(1, transferableBlock.getNumRows()); if (transferableBlock.isEndOfStreamBlock()) { return transferableBlock; } - + _operatorStats.endTimer(); transferableBlock = _dataTableBlockBaseOperator.nextBlock(); + _operatorStats.startTimer(); } } catch (final Exception e) { // ideally, MailboxSendOperator doesn't ever throw an exception because @@ -143,8 +153,9 @@ public class MailboxSendOperator extends MultiStageOperator { } catch (Exception e2) { LOGGER.error("Exception while sending block to mailbox.", e2); } + } finally { + _operatorStats.endTimer(); } - return transferableBlock; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java index 424d7d003f..ae6bae362f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java @@ -52,6 +52,7 @@ public class OpChain implements AutoCloseable { return _receivingMailbox; } + // TODO: Move OperatorStats here. public OpChainStats getStats() { return _stats; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java index 58327c40da..07705f8bac 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java @@ -34,13 +34,14 @@ public class OpChainStats { // use memoized supplier so that the timing doesn't start until the // first time we get the timer - private final Supplier<ThreadResourceUsageProvider> _exTimer - = Suppliers.memoize(ThreadResourceUsageProvider::new)::get; + private final Supplier<ThreadResourceUsageProvider> _exTimer = + Suppliers.memoize(ThreadResourceUsageProvider::new)::get; // this is used to make sure that toString() doesn't have side // effects (accidentally starting the timer) private volatile boolean _exTimerStarted = false; + private final Stopwatch _executeStopwatch = Stopwatch.createUnstarted(); private final Stopwatch _queuedStopwatch = Stopwatch.createUnstarted(); private final AtomicLong _queuedCount = new AtomicLong(); @@ -62,20 +63,23 @@ public class OpChainStats { if (!_queuedStopwatch.isRunning()) { _queuedStopwatch.start(); } + if (_executeStopwatch.isRunning()) { + _executeStopwatch.stop(); + } } public void startExecutionTimer() { _exTimerStarted = true; _exTimer.get(); + if (!_executeStopwatch.isRunning()) { + _executeStopwatch.start(); + } } @Override public String toString() { - return String.format("(%s) Queued Count: %s, Executing Time: %sms, Queued Time: %sms", - _id, - _queuedCount.get(), - _exTimerStarted ? TimeUnit.NANOSECONDS.toMillis(_exTimer.get().getThreadTimeNs()) : 0, - _queuedStopwatch.elapsed(TimeUnit.MILLISECONDS) - ); + return String.format("(%s) Queued Count: %s, Executing Time: %sms, Queued Time: %sms", _id, _queuedCount.get(), + _exTimerStarted ? _executeStopwatch.elapsed(TimeUnit.MILLISECONDS) : 0, + _queuedStopwatch.elapsed(TimeUnit.MILLISECONDS)); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java new file mode 100644 index 0000000000..c40b96b3c8 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.operator; + +import com.google.common.base.Stopwatch; +import java.util.concurrent.TimeUnit; + + +public class OperatorStats { + private final Stopwatch _executeStopwatch = Stopwatch.createUnstarted(); + + // TODO: add a operatorId for better tracking purpose. + private final int _stageId; + private final long _requestId; + + private final String _operatorType; + + private int _numInputBlock = 0; + private int _numInputRows = 0; + + private int _numOutputBlock = 0; + + private int _numOutputRows = 0; + + public OperatorStats(long requestId, int stageId, String operatorType) { + _stageId = stageId; + _requestId = requestId; + _operatorType = operatorType; + } + + public void startTimer() { + if (!_executeStopwatch.isRunning()) { + _executeStopwatch.start(); + } + } + + public void endTimer() { + if (_executeStopwatch.isRunning()) { + _executeStopwatch.stop(); + } + } + + public void recordInput(int numBlock, int numRows) { + _numInputBlock += numBlock; + _numInputRows += numRows; + } + + public void recordOutput(int numBlock, int numRows) { + _numOutputBlock += numBlock; + _numOutputRows += numRows; + } + + // TODO: Return the string as a JSON string. + @Override + public String toString() { + return String.format( + "OperatorStats[type: %s, requestId: %s, stageId %s] ExecutionWallTime: %sms, InputRows: %s, InputBlock: " + + "%s, OutputRows: %s, OutputBlock: %s", _operatorType, _requestId, _stageId, + _executeStopwatch.elapsed(TimeUnit.MILLISECONDS), _numInputRows, _numInputBlock, _numOutputRows, + _numOutputBlock); + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java index 4ba3dbde94..13f4306e01 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java @@ -32,11 +32,15 @@ import org.apache.pinot.core.query.selection.SelectionOperatorUtils; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SortOperator extends MultiStageOperator { private static final String EXPLAIN_NAME = "SORT"; private final MultiStageOperator _upstreamOperator; + private static final Logger LOGGER = LoggerFactory.getLogger(SortOperator.class); + private final int _fetch; private final int _offset; private final DataSchema _dataSchema; @@ -46,17 +50,19 @@ public class SortOperator extends MultiStageOperator { private boolean _readyToConstruct; private boolean _isSortedBlockConstructed; private TransferableBlock _upstreamErrorBlock; + private OperatorStats _operatorStats; public SortOperator(MultiStageOperator upstreamOperator, List<RexExpression> collationKeys, - List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema) { + List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema, + long requestId, int stageId) { this(upstreamOperator, collationKeys, collationDirections, fetch, offset, dataSchema, - SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY); + SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY, requestId, stageId); } @VisibleForTesting SortOperator(MultiStageOperator upstreamOperator, List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema, - int maxHolderCapacity) { + int maxHolderCapacity, long requestId, int stageId) { _upstreamOperator = upstreamOperator; _fetch = fetch; _offset = offset; @@ -68,6 +74,7 @@ public class SortOperator extends MultiStageOperator { : maxHolderCapacity; _rows = new PriorityQueue<>(_numRowsToKeep, new SortComparator(collationKeys, collationDirections, dataSchema, false)); + _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME); } @Override @@ -78,21 +85,27 @@ public class SortOperator extends MultiStageOperator { @Nullable @Override public String toExplainString() { + _upstreamOperator.toExplainString(); + LOGGER.debug(_operatorStats.toString()); return EXPLAIN_NAME; } @Override protected TransferableBlock getNextBlock() { + _operatorStats.startTimer(); try { consumeInputBlocks(); return produceSortedBlock(); } catch (Exception e) { return TransferableBlockUtils.getErrorTransferableBlock(e); + } finally { + _operatorStats.endTimer(); } } private TransferableBlock produceSortedBlock() { if (_upstreamErrorBlock != null) { + LOGGER.error("OperatorStats:" + _operatorStats); return _upstreamErrorBlock; } else if (!_readyToConstruct) { return TransferableBlockUtils.getNoOpTransferableBlock(); @@ -104,6 +117,7 @@ public class SortOperator extends MultiStageOperator { Object[] row = _rows.poll(); rows.addFirst(row); } + _operatorStats.recordOutput(1, rows.size()); _isSortedBlockConstructed = true; if (rows.size() == 0) { return TransferableBlockUtils.getEndOfStreamTransferableBlock(); @@ -117,7 +131,9 @@ public class SortOperator extends MultiStageOperator { private void consumeInputBlocks() { if (!_isSortedBlockConstructed) { + _operatorStats.endTimer(); TransferableBlock block = _upstreamOperator.nextBlock(); + _operatorStats.startTimer(); while (!block.isNoOpBlock()) { // setting upstream error block if (block.isErrorBlock()) { @@ -132,8 +148,10 @@ public class SortOperator extends MultiStageOperator { for (Object[] row : container) { SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep); } - + _operatorStats.endTimer(); block = _upstreamOperator.nextBlock(); + _operatorStats.startTimer(); + _operatorStats.recordInput(1, container.size()); } } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java index 83db10d4a2..a75ad6b017 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java @@ -30,6 +30,8 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.operands.TransformOperand; import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -45,14 +47,16 @@ import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils; public class TransformOperator extends MultiStageOperator { private static final String EXPLAIN_NAME = "TRANSFORM"; private final MultiStageOperator _upstreamOperator; + private static final Logger LOGGER = LoggerFactory.getLogger(TransformOperator.class); private final List<TransformOperand> _transformOperandsList; private final int _resultColumnSize; // TODO: Check type matching between resultSchema and the actual result. private final DataSchema _resultSchema; private TransferableBlock _upstreamErrorBlock; + private OperatorStats _operatorStats; public TransformOperator(MultiStageOperator upstreamOperator, DataSchema resultSchema, - List<RexExpression> transforms, DataSchema upstreamDataSchema) { + List<RexExpression> transforms, DataSchema upstreamDataSchema, long requestId, int stageId) { Preconditions.checkState(!transforms.isEmpty(), "transform operand should not be empty."); Preconditions.checkState(resultSchema.size() == transforms.size(), "result schema size:" + resultSchema.size() + " doesn't match transform operand size:" + transforms.size()); @@ -63,6 +67,7 @@ public class TransformOperator extends MultiStageOperator { _transformOperandsList.add(TransformOperand.toTransformOperand(rexExpression, upstreamDataSchema)); } _resultSchema = resultSchema; + _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME); } @Override @@ -73,15 +78,23 @@ public class TransformOperator extends MultiStageOperator { @Nullable @Override public String toExplainString() { + _upstreamOperator.toExplainString(); + LOGGER.debug(_operatorStats.toString()); return EXPLAIN_NAME; } @Override protected TransferableBlock getNextBlock() { + _operatorStats.startTimer(); try { - return transform(_upstreamOperator.nextBlock()); + _operatorStats.endTimer(); + TransferableBlock block = _upstreamOperator.nextBlock(); + _operatorStats.startTimer(); + return transform(block); } catch (Exception e) { return TransferableBlockUtils.getErrorTransferableBlock(e); + } finally { + _operatorStats.endTimer(); } } @@ -103,11 +116,13 @@ public class TransformOperator extends MultiStageOperator { for (Object[] row : container) { Object[] resultRow = new Object[_resultColumnSize]; for (int i = 0; i < _resultColumnSize; i++) { - resultRow[i] = FunctionInvokeUtils.convert(_transformOperandsList.get(i).apply(row), - _resultSchema.getColumnDataType(i)); + resultRow[i] = + FunctionInvokeUtils.convert(_transformOperandsList.get(i).apply(row), _resultSchema.getColumnDataType(i)); } resultRows.add(resultRow); } + _operatorStats.recordInput(1, container.size()); + _operatorStats.recordOutput(1, resultRows.size()); return new TransferableBlock(resultRows, _resultSchema, DataBlock.Type.ROW); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java index aaa140463f..d675efea62 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java @@ -84,13 +84,14 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator, public MultiStageOperator visitAggregate(AggregateNode node, PlanRequestContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); return new AggregateOperator(nextOperator, node.getDataSchema(), node.getAggCalls(), - node.getGroupSet(), node.getInputs().get(0).getDataSchema()); + node.getGroupSet(), node.getInputs().get(0).getDataSchema(), context._requestId, context._stageId); } @Override public MultiStageOperator visitFilter(FilterNode node, PlanRequestContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); - return new FilterOperator(nextOperator, node.getDataSchema(), node.getCondition()); + return new FilterOperator(nextOperator, node.getDataSchema(), node.getCondition(), context.getRequestId(), + context.getStageId()); } @Override @@ -101,21 +102,22 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator, MultiStageOperator leftOperator = left.visit(this, context); MultiStageOperator rightOperator = right.visit(this, context); - return new HashJoinOperator(leftOperator, rightOperator, left.getDataSchema(), node); + return new HashJoinOperator(leftOperator, rightOperator, left.getDataSchema(), node, context.getRequestId(), + context.getStageId()); } @Override public MultiStageOperator visitProject(ProjectNode node, PlanRequestContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); return new TransformOperator(nextOperator, node.getDataSchema(), node.getProjects(), - node.getInputs().get(0).getDataSchema()); + node.getInputs().get(0).getDataSchema(), context.getRequestId(), context.getStageId()); } @Override public MultiStageOperator visitSort(SortNode node, PlanRequestContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); return new SortOperator(nextOperator, node.getCollationKeys(), node.getCollationDirections(), - node.getFetch(), node.getOffset(), node.getDataSchema()); + node.getFetch(), node.getOffset(), node.getDataSchema(), context.getRequestId(), context.getStageId()); } @Override @@ -125,6 +127,7 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator, @Override public MultiStageOperator visitValue(ValueNode node, PlanRequestContext context) { - return new LiteralValueOperator(node.getDataSchema(), node.getLiteralRows()); + return new LiteralValueOperator(node.getDataSchema(), node.getLiteralRows(), context.getRequestId(), + context.getStageId()); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java index 25b7aa5e8b..4cbd4aa624 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java @@ -46,12 +46,16 @@ import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils; import org.roaringbitmap.RoaringBitmap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@code QueryDispatcher} dispatch a query to different workers. */ public class QueryDispatcher { + private static final Logger LOGGER = LoggerFactory.getLogger(QueryDispatcher.class); + private final Map<String, DispatchClient> _dispatchClientMap = new ConcurrentHashMap<>(); public QueryDispatcher() { @@ -69,8 +73,14 @@ public class QueryDispatcher { reduceNode.getSenderStageId(), reduceNode.getDataSchema(), mailboxService.getHostname(), mailboxService.getMailboxPort(), timeoutMs); List<DataBlock> resultDataBlocks = reduceMailboxReceive(mailboxReceiveOperator, timeoutMs); - return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(), + mailboxReceiveOperator.toExplainString(); + long toResultTableStartTime = System.currentTimeMillis(); + ResultTable resultTable = toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(), queryPlan.getQueryStageMap().get(0).getDataSchema()); + LOGGER.debug( + "RequestId:" + requestId + " StageId: 0 Broker toResultTable processing time:" + (System.currentTimeMillis() + - toResultTableStartTime) + " ms"); + return resultTable; } public int submit(long requestId, QueryPlan queryPlan, long timeoutMs) diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java index aa7394a6a8..6f2212e6b6 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java @@ -71,7 +71,7 @@ public class AggregateOperatorTest { DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT}); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema); + AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2); // When: TransferableBlock block1 = operator.nextBlock(); // build @@ -87,12 +87,11 @@ public class AggregateOperatorTest { List<RexExpression> calls = ImmutableList.of(getSum(new RexExpression.InputRef(1))); List<RexExpression> group = ImmutableList.of(new RexExpression.InputRef(0)); - Mockito.when(_input.nextBlock()) - .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT}); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema); + AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2); // When: TransferableBlock block = operator.nextBlock(); @@ -109,13 +108,12 @@ public class AggregateOperatorTest { List<RexExpression> group = ImmutableList.of(new RexExpression.InputRef(0)); DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT}); - Mockito.when(_input.nextBlock()) - .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{1, 1})) + Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, new Object[]{1, 1})) .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema); + AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2); // When: TransferableBlock block1 = operator.nextBlock(); // build when reading NoOp block @@ -134,12 +132,11 @@ public class AggregateOperatorTest { List<RexExpression> group = ImmutableList.of(new RexExpression.InputRef(0)); DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT}); - Mockito.when(_input.nextBlock()) - .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 1})) + Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 1})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema); + AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2); // When: TransferableBlock block1 = operator.nextBlock(); @@ -160,12 +157,11 @@ public class AggregateOperatorTest { List<RexExpression> group = ImmutableList.of(new RexExpression.InputRef(0)); DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT}); - Mockito.when(_input.nextBlock()) - .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 3})) + Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 3})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema); + AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2); // When: TransferableBlock block1 = operator.nextBlock(); @@ -196,9 +192,8 @@ public class AggregateOperatorTest { Mockito.when(merger.merge(Mockito.any(), Mockito.any())).thenReturn(12d); Mockito.when(merger.initialize(Mockito.any())).thenReturn(1d); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, ImmutableMap.of( - "SUM", cdt -> merger - )); + AggregateOperator operator = + new AggregateOperator(_input, outSchema, calls, group, inSchema, ImmutableMap.of("SUM", cdt -> merger), 1, 2); // When: TransferableBlock resultBlock = operator.nextBlock(); // (output result) @@ -220,7 +215,7 @@ public class AggregateOperatorTest { DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT}); AggregateOperator sum0GroupBy1 = new AggregateOperator(upstreamOperator, OperatorTestUtil.getDataSchema(OperatorTestUtil.OP_1), - Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1)), inSchema); + Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1)), inSchema, 1, 2); TransferableBlock result = sum0GroupBy1.getNextBlock(); while (result.isNoOpBlock()) { result = sum0GroupBy1.getNextBlock(); @@ -232,20 +227,18 @@ public class AggregateOperatorTest { Assert.assertEquals(resultRows.get(1), expectedRows.get(1)); } - @Test( - expectedExceptions = IllegalStateException.class, - expectedExceptionsMessageRegExp = ".*Unexpected value: AVERAGE.*") + @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*Unexpected value: " + + "AVERAGE.*") public void shouldThrowOnUnknownAggFunction() { // Given: List<RexExpression> calls = ImmutableList.of( - new RexExpression.FunctionCall(SqlKind.AVG, FieldSpec.DataType.INT, "AVERAGE", ImmutableList.of()) - ); + new RexExpression.FunctionCall(SqlKind.AVG, FieldSpec.DataType.INT, "AVERAGE", ImmutableList.of())); List<RexExpression> group = ImmutableList.of(new RexExpression.InputRef(0)); DataSchema outSchema = new DataSchema(new String[]{"unknown"}, new ColumnDataType[]{DOUBLE}); DataSchema inSchema = new DataSchema(new String[]{"unknown"}, new ColumnDataType[]{DOUBLE}); // When: - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema); + AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2); } @Test @@ -262,7 +255,7 @@ public class AggregateOperatorTest { .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema); + AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2); // When: TransferableBlock block = operator.nextBlock(); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java index c53ea2037c..cf5cb80151 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java @@ -61,7 +61,7 @@ public class FilterOperatorTest { DataSchema inputSchema = new DataSchema(new String[]{"boolCol"}, new DataSchema.ColumnDataType[]{ DataSchema.ColumnDataType.BOOLEAN }); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2); TransferableBlock errorBlock = op.getNextBlock(); Assert.assertTrue(errorBlock.isErrorBlock()); DataBlock error = errorBlock.getDataBlock(); @@ -76,7 +76,7 @@ public class FilterOperatorTest { DataSchema.ColumnDataType.INT }); Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertTrue(dataBlock.isEndOfStreamBlock()); } @@ -89,7 +89,7 @@ public class FilterOperatorTest { DataSchema.ColumnDataType.INT }); Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertTrue(dataBlock.isNoOpBlock()); } @@ -104,7 +104,7 @@ public class FilterOperatorTest { Mockito.when(_upstreamOperator.nextBlock()) .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{0}, new Object[]{1})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertFalse(dataBlock.isErrorBlock()); List<Object[]> result = dataBlock.getContainer(); @@ -122,7 +122,7 @@ public class FilterOperatorTest { }); Mockito.when(_upstreamOperator.nextBlock()) .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2})); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertFalse(dataBlock.isErrorBlock()); List<Object[]> result = dataBlock.getContainer(); @@ -137,7 +137,7 @@ public class FilterOperatorTest { }); Mockito.when(_upstreamOperator.nextBlock()) .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2})); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2); TransferableBlock errorBlock = op.getNextBlock(); Assert.assertTrue(errorBlock.isErrorBlock()); DataBlock data = errorBlock.getDataBlock(); @@ -152,7 +152,7 @@ public class FilterOperatorTest { }); Mockito.when(_upstreamOperator.nextBlock()) .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2})); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref0); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref0, 1, 2); TransferableBlock errorBlock = op.getNextBlock(); Assert.assertTrue(errorBlock.isErrorBlock()); DataBlock data = errorBlock.getDataBlock(); @@ -167,7 +167,7 @@ public class FilterOperatorTest { }); Mockito.when(_upstreamOperator.nextBlock()) .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1, true}, new Object[]{2, false})); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref1); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref1, 1, 2); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertFalse(dataBlock.isErrorBlock()); List<Object[]> result = dataBlock.getContainer(); @@ -187,7 +187,7 @@ public class FilterOperatorTest { RexExpression.FunctionCall andCall = new RexExpression.FunctionCall(SqlKind.AND, FieldSpec.DataType.BOOLEAN, "AND", ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1))); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, andCall); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, andCall, 1, 2); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertFalse(dataBlock.isErrorBlock()); List<Object[]> result = dataBlock.getContainer(); @@ -207,7 +207,7 @@ public class FilterOperatorTest { RexExpression.FunctionCall orCall = new RexExpression.FunctionCall(SqlKind.OR, FieldSpec.DataType.BOOLEAN, "OR", ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1))); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, orCall); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, orCall, 1, 2); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertFalse(dataBlock.isErrorBlock()); List<Object[]> result = dataBlock.getContainer(); @@ -229,7 +229,7 @@ public class FilterOperatorTest { RexExpression.FunctionCall notCall = new RexExpression.FunctionCall(SqlKind.NOT, FieldSpec.DataType.BOOLEAN, "NOT", ImmutableList.of(new RexExpression.InputRef(0))); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, notCall); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, notCall, 1, 2); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertFalse(dataBlock.isErrorBlock()); List<Object[]> result = dataBlock.getContainer(); @@ -248,7 +248,7 @@ public class FilterOperatorTest { RexExpression.FunctionCall greaterThan = new RexExpression.FunctionCall(SqlKind.GREATER_THAN, FieldSpec.DataType.BOOLEAN, "greaterThan", ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1))); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, greaterThan); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, greaterThan, 1, 2); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertFalse(dataBlock.isErrorBlock()); List<Object[]> result = dataBlock.getContainer(); @@ -268,7 +268,7 @@ public class FilterOperatorTest { new RexExpression.FunctionCall(SqlKind.OTHER, FieldSpec.DataType.BOOLEAN, "startsWith", ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.Literal(FieldSpec.DataType.STRING, "star"))); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith, 1, 2); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertFalse(dataBlock.isErrorBlock()); List<Object[]> result = dataBlock.getContainer(); @@ -289,6 +289,6 @@ public class FilterOperatorTest { new RexExpression.FunctionCall(SqlKind.OTHER, FieldSpec.DataType.BOOLEAN, "startsWithError", ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.Literal(FieldSpec.DataType.STRING, "star"))); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith); + FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith, 1, 2); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java index b9ea6b2078..4075237249 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java @@ -90,7 +90,7 @@ public class HashJoinOperatorTest { }); JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses); - HashJoinOperator joinOnString = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node); + HashJoinOperator joinOnString = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2); TransferableBlock result = joinOnString.nextBlock(); while (result.isNoOpBlock()) { @@ -127,7 +127,7 @@ public class HashJoinOperatorTest { }); JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); - HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node); + HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2); TransferableBlock result = joinOnInt.nextBlock(); while (result.isNoOpBlock()) { result = joinOnInt.nextBlock(); @@ -161,7 +161,7 @@ public class HashJoinOperatorTest { }); JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses); - HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node); + HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2); TransferableBlock result = joinOnInt.nextBlock(); while (result.isNoOpBlock()) { result = joinOnInt.nextBlock(); @@ -202,7 +202,7 @@ public class HashJoinOperatorTest { }); JoinNode node = new JoinNode(1, resultSchema, JoinRelType.LEFT, getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { @@ -236,7 +236,7 @@ public class HashJoinOperatorTest { List<RexExpression> joinClauses = new ArrayList<>(); JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { @@ -267,7 +267,7 @@ public class HashJoinOperatorTest { }); JoinNode node = new JoinNode(1, resultSchema, JoinRelType.LEFT, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { @@ -301,7 +301,7 @@ public class HashJoinOperatorTest { }); JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { @@ -339,7 +339,7 @@ public class HashJoinOperatorTest { }); JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { result = join.nextBlock(); @@ -377,7 +377,7 @@ public class HashJoinOperatorTest { }); JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { result = join.nextBlock(); @@ -411,7 +411,7 @@ public class HashJoinOperatorTest { }); JoinNode node = new JoinNode(1, resultSchema, JoinRelType.RIGHT, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); - HashJoinOperator joinOnNum = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node); + HashJoinOperator joinOnNum = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2); TransferableBlock result = joinOnNum.nextBlock(); while (result.isNoOpBlock()) { result = joinOnNum.nextBlock(); @@ -438,8 +438,7 @@ public class HashJoinOperatorTest { Assert.assertTrue(result.isSuccessfulEndOfStreamBlock()); } - @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*SEMI is not " - + "supported.*") + @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*SEMI is not supported.*") public void shouldThrowOnSemiJoin() { DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING @@ -461,7 +460,7 @@ public class HashJoinOperatorTest { }); JoinNode node = new JoinNode(1, resultSchema, JoinRelType.SEMI, getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2); } @Test @@ -485,7 +484,7 @@ public class HashJoinOperatorTest { }); JoinNode node = new JoinNode(1, resultSchema, JoinRelType.FULL, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { result = join.nextBlock(); @@ -537,7 +536,7 @@ public class HashJoinOperatorTest { }); JoinNode node = new JoinNode(1, resultSchema, JoinRelType.ANTI, getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2); } @Test @@ -562,7 +561,7 @@ public class HashJoinOperatorTest { }); JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { @@ -595,7 +594,7 @@ public class HashJoinOperatorTest { }); JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { @@ -631,7 +630,7 @@ public class HashJoinOperatorTest { }); JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node); + HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2); TransferableBlock result = join.nextBlock(); // first no-op consumes first right data block. Assert.assertTrue(result.isNoOpBlock()); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java index 45a04eb2ff..185a6d5d53 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java @@ -52,7 +52,8 @@ public class LeafStageTransferableBlockOperatorTest { new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT}); List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock( new SelectionResultsBlock(schema, Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), queryContext)); - LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(resultsBlockList, schema); + LeafStageTransferableBlockOperator operator = + new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -66,18 +67,19 @@ public class LeafStageTransferableBlockOperatorTest { @Test public void shouldHandleDesiredDataSchemaConversionCorrectly() { // Given: - QueryContext queryContext = QueryContextConverterUtils.getQueryContext( - "SELECT boolCol, tsCol, boolCol AS newNamedBoolCol FROM tbl"); + QueryContext queryContext = + QueryContextConverterUtils.getQueryContext("SELECT boolCol, tsCol, boolCol AS newNamedBoolCol FROM tbl"); DataSchema resultSchema = new DataSchema(new String[]{"boolCol", "tsCol"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.TIMESTAMP}); - DataSchema desiredSchema = new DataSchema(new String[]{"boolCol", "tsCol", "newNamedBoolCol"}, - new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.TIMESTAMP, - DataSchema.ColumnDataType.BOOLEAN}); + DataSchema desiredSchema = + new DataSchema(new String[]{"boolCol", "tsCol", "newNamedBoolCol"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.TIMESTAMP, DataSchema.ColumnDataType.BOOLEAN + }); List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock( - new SelectionResultsBlock(resultSchema, Arrays.asList(new Object[]{1, 1660000000000L}, - new Object[]{0, 1600000000000L})), queryContext)); - LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(resultsBlockList, - desiredSchema); + new SelectionResultsBlock(resultSchema, + Arrays.asList(new Object[]{1, 1660000000000L}, new Object[]{0, 1600000000000L})), queryContext)); + LeafStageTransferableBlockOperator operator = + new LeafStageTransferableBlockOperator(resultsBlockList, desiredSchema, 1, 2); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -96,9 +98,10 @@ public class LeafStageTransferableBlockOperatorTest { DataSchema schema = new DataSchema(new String[]{"boolCol", "tsCol"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.TIMESTAMP}); List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock( - new SelectionResultsBlock(schema, Arrays.asList(new Object[]{1, 1660000000000L}, - new Object[]{0, 1600000000000L})), queryContext)); - LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(resultsBlockList, schema); + new SelectionResultsBlock(schema, + Arrays.asList(new Object[]{1, 1660000000000L}, new Object[]{0, 1600000000000L})), queryContext)); + LeafStageTransferableBlockOperator operator = + new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -115,13 +118,15 @@ public class LeafStageTransferableBlockOperatorTest { QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT strCol, intCol FROM tbl"); DataSchema schema = new DataSchema(new String[]{"strCol", "intCol"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT}); - List<InstanceResponseBlock> resultsBlockList = Arrays.asList( - new InstanceResponseBlock(new SelectionResultsBlock(schema, - Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), queryContext), - new InstanceResponseBlock(new SelectionResultsBlock(schema, - Arrays.asList(new Object[]{"bar", 3}, new Object[]{"foo", 4})), queryContext), + List<InstanceResponseBlock> resultsBlockList = Arrays.asList(new InstanceResponseBlock( + new SelectionResultsBlock(schema, Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), + queryContext), + new InstanceResponseBlock( + new SelectionResultsBlock(schema, Arrays.asList(new Object[]{"bar", 3}, new Object[]{"foo", 4})), + queryContext), new InstanceResponseBlock(new SelectionResultsBlock(schema, Collections.emptyList()), queryContext)); - LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(resultsBlockList, schema); + LeafStageTransferableBlockOperator operator = + new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2); // When: TransferableBlock resultBlock1 = operator.nextBlock(); @@ -145,12 +150,13 @@ public class LeafStageTransferableBlockOperatorTest { new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT}); InstanceResponseBlock errorBlock = new InstanceResponseBlock(); errorBlock.addException(QueryException.QUERY_EXECUTION_ERROR.getErrorCode(), "foobar"); - List<InstanceResponseBlock> resultsBlockList = Arrays.asList( - new InstanceResponseBlock(new SelectionResultsBlock(schema, - Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), queryContext), + List<InstanceResponseBlock> resultsBlockList = Arrays.asList(new InstanceResponseBlock( + new SelectionResultsBlock(schema, Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), + queryContext), errorBlock, new InstanceResponseBlock(new SelectionResultsBlock(schema, Collections.emptyList()), queryContext)); - LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(resultsBlockList, schema); + LeafStageTransferableBlockOperator operator = + new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -162,16 +168,16 @@ public class LeafStageTransferableBlockOperatorTest { @Test public void shouldReorderWhenQueryContextAskForNotInOrderGroupByAsDistinct() { // Given: - QueryContext queryContext = QueryContextConverterUtils.getQueryContext( - "SELECT intCol, strCol FROM tbl GROUP BY strCol, intCol"); + QueryContext queryContext = + QueryContextConverterUtils.getQueryContext("SELECT intCol, strCol FROM tbl GROUP BY strCol, intCol"); // result schema doesn't match with DISTINCT columns using GROUP BY. DataSchema schema = new DataSchema(new String[]{"intCol", "strCol"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING}); - List<InstanceResponseBlock> resultsBlockList = Collections.singletonList( - new InstanceResponseBlock(new DistinctResultsBlock(mock(DistinctAggregationFunction.class), - new DistinctTable(schema, Arrays.asList( - new Record(new Object[]{1, "foo"}), new Record(new Object[]{2, "bar"})))), queryContext)); - LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(resultsBlockList, schema); + List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock( + new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new DistinctTable(schema, + Arrays.asList(new Record(new Object[]{1, "foo"}), new Record(new Object[]{2, "bar"})))), queryContext)); + LeafStageTransferableBlockOperator operator = + new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -184,16 +190,15 @@ public class LeafStageTransferableBlockOperatorTest { @Test public void shouldParsedBlocksSuccessfullyWithDistinctQuery() { // Given: - QueryContext queryContext = QueryContextConverterUtils.getQueryContext( - "SELECT DISTINCT strCol, intCol FROM tbl"); + QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT DISTINCT strCol, intCol FROM tbl"); // result schema doesn't match with DISTINCT columns using GROUP BY. DataSchema schema = new DataSchema(new String[]{"strCol", "intCol"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT}); - List<InstanceResponseBlock> resultsBlockList = Collections.singletonList( - new InstanceResponseBlock(new DistinctResultsBlock(mock(DistinctAggregationFunction.class), - new DistinctTable(schema, Arrays.asList( - new Record(new Object[]{"foo", 1}), new Record(new Object[]{"bar", 2})))), queryContext)); - LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(resultsBlockList, schema); + List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock( + new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new DistinctTable(schema, + Arrays.asList(new Record(new Object[]{"foo", 1}), new Record(new Object[]{"bar", 2})))), queryContext)); + LeafStageTransferableBlockOperator operator = + new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -209,12 +214,15 @@ public class LeafStageTransferableBlockOperatorTest { QueryContext queryContext = QueryContextConverterUtils.getQueryContext( "SELECT intCol, count(*), sum(doubleCol), strCol FROM tbl GROUP BY strCol, intCol"); // result schema doesn't match with columns ordering using GROUP BY, this should not occur. - DataSchema schema = new DataSchema(new String[]{"intCol", "count(*)", "sum(doubleCol)", "strCol"}, - new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT, - DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.STRING}); + DataSchema schema = + new DataSchema(new String[]{"intCol", "count(*)", "sum(doubleCol)", "strCol"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG, + DataSchema.ColumnDataType.STRING + }); List<InstanceResponseBlock> resultsBlockList = Collections.singletonList( new InstanceResponseBlock(new GroupByResultsBlock(schema, Collections.emptyList()), queryContext)); - LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(resultsBlockList, schema); + LeafStageTransferableBlockOperator operator = + new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -230,11 +238,14 @@ public class LeafStageTransferableBlockOperatorTest { + "sum(doubleCol) FROM tbl GROUP BY strCol, intCol HAVING sum(doubleCol) < 10 AND count(*) > 0"); // result schema contains duplicate reference from agg and having. it will repeat itself. DataSchema schema = new DataSchema(new String[]{"strCol", "intCol", "count(*)", "sum(doubleCol)", "sum(doubleCol)"}, - new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, - DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG}); + new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG + }); List<InstanceResponseBlock> resultsBlockList = Collections.singletonList( new InstanceResponseBlock(new GroupByResultsBlock(schema, Collections.emptyList()), queryContext)); - LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(resultsBlockList, schema); + LeafStageTransferableBlockOperator operator = + new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -246,15 +257,14 @@ public class LeafStageTransferableBlockOperatorTest { @Test public void shouldNotErrorOutWhenDealingWithAggregationResults() { // Given: - QueryContext queryContext = QueryContextConverterUtils.getQueryContext( - "SELECT count(*), sum(doubleCol) FROM tbl"); + QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT count(*), sum(doubleCol) FROM tbl"); // result schema doesn't match with DISTINCT columns using GROUP BY. DataSchema schema = new DataSchema(new String[]{"count_star", "sum(doubleCol)"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG}); - List<InstanceResponseBlock> resultsBlockList = Collections.singletonList( - new InstanceResponseBlock(new AggregationResultsBlock(queryContext.getAggregationFunctions(), - Collections.emptyList()), queryContext)); - LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(resultsBlockList, schema); + List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock( + new AggregationResultsBlock(queryContext.getAggregationFunctions(), Collections.emptyList()), queryContext)); + LeafStageTransferableBlockOperator operator = + new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -275,8 +285,8 @@ public class LeafStageTransferableBlockOperatorTest { // When: List<InstanceResponseBlock> responseBlockList = Collections.singletonList( new InstanceResponseBlock(new SelectionResultsBlock(resultSchema, Collections.emptyList()), queryContext)); - LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(responseBlockList, - desiredSchema); + LeafStageTransferableBlockOperator operator = + new LeafStageTransferableBlockOperator(responseBlockList, desiredSchema, 1, 2); TransferableBlock resultBlock = operator.nextBlock(); // Then: @@ -287,19 +297,19 @@ public class LeafStageTransferableBlockOperatorTest { @Test public void shouldNotErrorOutWhenIncorrectDataSchemaProvidedWithEmptyRowsDistinct() { // Given: - QueryContext queryContext = QueryContextConverterUtils.getQueryContext( - "SELECT strCol, intCol FROM tbl GROUP BY strCol, intCol"); + QueryContext queryContext = + QueryContextConverterUtils.getQueryContext("SELECT strCol, intCol FROM tbl GROUP BY strCol, intCol"); DataSchema resultSchema = new DataSchema(new String[]{"strCol", "intCol"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING}); DataSchema desiredSchema = new DataSchema(new String[]{"strCol", "intCol"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT}); // When: - List<InstanceResponseBlock> responseBlockList = Collections.singletonList( - new InstanceResponseBlock(new DistinctResultsBlock(mock(DistinctAggregationFunction.class), + List<InstanceResponseBlock> responseBlockList = Collections.singletonList(new InstanceResponseBlock( + new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new DistinctTable(resultSchema, Collections.emptyList())), queryContext)); - LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(responseBlockList, - desiredSchema); + LeafStageTransferableBlockOperator operator = + new LeafStageTransferableBlockOperator(responseBlockList, desiredSchema, 1, 2); TransferableBlock resultBlock = operator.nextBlock(); // Then: @@ -310,18 +320,18 @@ public class LeafStageTransferableBlockOperatorTest { @Test public void shouldNotErrorOutWhenIncorrectDataSchemaProvidedWithEmptyRowsGroupBy() { // Given: - QueryContext queryContext = QueryContextConverterUtils.getQueryContext( - "SELECT strCol, SUM(intCol) FROM tbl GROUP BY strCol"); + QueryContext queryContext = + QueryContextConverterUtils.getQueryContext("SELECT strCol, SUM(intCol) FROM tbl GROUP BY strCol"); DataSchema resultSchema = new DataSchema(new String[]{"strCol", "SUM(intCol)"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING}); DataSchema desiredSchema = new DataSchema(new String[]{"strCol", "SUM(intCol)"}, - new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT}); + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT}); // When: List<InstanceResponseBlock> responseBlockList = Collections.singletonList( new InstanceResponseBlock(new GroupByResultsBlock(resultSchema, Collections.emptyList()), queryContext)); - LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(responseBlockList, - desiredSchema); + LeafStageTransferableBlockOperator operator = + new LeafStageTransferableBlockOperator(responseBlockList, desiredSchema, 1, 2); TransferableBlock resultBlock = operator.nextBlock(); // Then: diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java index 663caf2272..856965cfb4 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java @@ -24,13 +24,34 @@ import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.plan.PlanRequestContext; import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class LiteralValueOperatorTest { + private AutoCloseable _mocks; + + @Mock + private PlanRequestContext _context; + + @BeforeMethod + public void setUp() { + _mocks = MockitoAnnotations.openMocks(this); + } + + @AfterMethod + public void tearDown() + throws Exception { + _mocks.close(); + } + @Test public void shouldReturnLiteralBlock() { // Given: @@ -44,7 +65,7 @@ public class LiteralValueOperatorTest { new RexExpression.Literal(DataType.STRING, ""), new RexExpression.Literal(DataType.INT, 2)) ); - LiteralValueOperator operator = new LiteralValueOperator(schema, literals); + LiteralValueOperator operator = new LiteralValueOperator(schema, literals, 1, 2); // When: TransferableBlock transferableBlock = operator.nextBlock(); @@ -60,7 +81,7 @@ public class LiteralValueOperatorTest { // Given: DataSchema schema = new DataSchema(new String[]{}, new ColumnDataType[]{}); List<List<RexExpression>> literals = ImmutableList.of(ImmutableList.of()); - LiteralValueOperator operator = new LiteralValueOperator(schema, literals); + LiteralValueOperator operator = new LiteralValueOperator(schema, literals, 1, 2); // When: TransferableBlock transferableBlock = operator.nextBlock(); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java index 3dc951dca3..9f6577dc66 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java @@ -74,7 +74,7 @@ public class MailboxSendOperatorTest { // Given: MailboxSendOperator operator = new MailboxSendOperator( _mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector, - server -> new StringMailboxIdentifier("123:from:1:to:2"), _exchangeFactory); + server -> new StringMailboxIdentifier("123:from:1:to:2"), _exchangeFactory, 1, 2); Mockito.when(_input.nextBlock()) .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()); @@ -91,7 +91,7 @@ public class MailboxSendOperatorTest { // Given: MailboxSendOperator operator = new MailboxSendOperator( _mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector, - server -> new StringMailboxIdentifier("123:from:1:to:2"), _exchangeFactory); + server -> new StringMailboxIdentifier("123:from:1:to:2"), _exchangeFactory, 1, 2); TransferableBlock errorBlock = TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!")); Mockito.when(_input.nextBlock()) .thenReturn(errorBlock); @@ -109,7 +109,7 @@ public class MailboxSendOperatorTest { // Given: MailboxSendOperator operator = new MailboxSendOperator( _mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector, - server -> new StringMailboxIdentifier("123:from:1:to:2"), _exchangeFactory); + server -> new StringMailboxIdentifier("123:from:1:to:2"), _exchangeFactory, 1, 2); Mockito.when(_input.nextBlock()) .thenThrow(new RuntimeException("foo!")); ArgumentCaptor<TransferableBlock> captor = ArgumentCaptor.forClass(TransferableBlock.class); @@ -128,7 +128,7 @@ public class MailboxSendOperatorTest { // Given: MailboxSendOperator operator = new MailboxSendOperator( _mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector, - server -> new StringMailboxIdentifier("123:from:1:to:2"), _exchangeFactory); + server -> new StringMailboxIdentifier("123:from:1:to:2"), _exchangeFactory, 1, 2); TransferableBlock eosBlock = TransferableBlockUtils.getEndOfStreamTransferableBlock(); Mockito.when(_input.nextBlock()) .thenReturn(eosBlock); @@ -146,7 +146,7 @@ public class MailboxSendOperatorTest { // Given: MailboxSendOperator operator = new MailboxSendOperator( _mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector, - server -> new StringMailboxIdentifier("123:from:1:to:2"), _exchangeFactory); + server -> new StringMailboxIdentifier("123:from:1:to:2"), _exchangeFactory, 1, 2); TransferableBlock dataBlock = block(new DataSchema(new String[]{}, new DataSchema.ColumnDataType[]{})); Mockito.when(_input.nextBlock()) .thenReturn(dataBlock) diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java index 2cc6b68a67..b11a00245a 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java @@ -64,7 +64,7 @@ public class SortOperatorTest { List<RexExpression> collation = collation(0); List<Direction> directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema); + SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2); Mockito.when(_input.nextBlock()) .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!"))); @@ -82,7 +82,7 @@ public class SortOperatorTest { List<RexExpression> collation = collation(0); List<Direction> directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema); + SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2); Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()); @@ -99,7 +99,7 @@ public class SortOperatorTest { List<RexExpression> collation = collation(0); List<Direction> directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema); + SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2); Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); @@ -116,7 +116,7 @@ public class SortOperatorTest { List<RexExpression> collation = collation(0); List<Direction> directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema); + SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2); Mockito.when(_input.nextBlock()) .thenReturn(block(schema, new Object[]{2}, new Object[]{1})) @@ -139,7 +139,7 @@ public class SortOperatorTest { List<RexExpression> collation = collation(1); List<Direction> directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"ignored", "sort"}, new DataSchema.ColumnDataType[]{INT, INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema); + SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2); Mockito.when(_input.nextBlock()) .thenReturn(block(schema, new Object[]{1, 2}, new Object[]{2, 1})) @@ -162,7 +162,7 @@ public class SortOperatorTest { List<RexExpression> collation = collation(0); List<Direction> directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{STRING}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema); + SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2); Mockito.when(_input.nextBlock()) .thenReturn(block(schema, new Object[]{"b"}, new Object[]{"a"})) @@ -185,7 +185,7 @@ public class SortOperatorTest { List<RexExpression> collation = collation(0); List<Direction> directions = ImmutableList.of(Direction.DESCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema); + SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2); Mockito.when(_input.nextBlock()) .thenReturn(block(schema, new Object[]{2}, new Object[]{1})) @@ -208,7 +208,7 @@ public class SortOperatorTest { List<RexExpression> collation = collation(0); List<Direction> directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 1, schema); + SortOperator op = new SortOperator(_input, collation, directions, 10, 1, schema, 1, 2); Mockito.when(_input.nextBlock()) .thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3})) @@ -231,7 +231,7 @@ public class SortOperatorTest { List<RexExpression> collation = collation(0); List<Direction> directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 1, 1, schema); + SortOperator op = new SortOperator(_input, collation, directions, 1, 1, schema, 1, 2); Mockito.when(_input.nextBlock()) .thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3})) @@ -253,7 +253,7 @@ public class SortOperatorTest { List<RexExpression> collation = collation(0); List<Direction> directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 2, 0, schema, 1); + SortOperator op = new SortOperator(_input, collation, directions, 2, 0, schema, 1, 1, 2); Mockito.when(_input.nextBlock()) .thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3})) @@ -275,7 +275,7 @@ public class SortOperatorTest { List<RexExpression> collation = collation(0); List<Direction> directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, -1, 0, schema); + SortOperator op = new SortOperator(_input, collation, directions, -1, 0, schema, 1, 2); Mockito.when(_input.nextBlock()) .thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3})) @@ -296,7 +296,7 @@ public class SortOperatorTest { List<RexExpression> collation = collation(0); List<Direction> directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema); + SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2); Mockito.when(_input.nextBlock()) .thenReturn(block(schema, new Object[]{2})) @@ -320,7 +320,7 @@ public class SortOperatorTest { List<RexExpression> collation = collation(0, 1); List<Direction> directions = ImmutableList.of(Direction.ASCENDING, Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"first", "second"}, new DataSchema.ColumnDataType[]{INT, INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema); + SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2); Mockito.when(_input.nextBlock()) .thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new Object[]{1, 3})) @@ -344,7 +344,7 @@ public class SortOperatorTest { List<RexExpression> collation = collation(0, 1); List<Direction> directions = ImmutableList.of(Direction.ASCENDING, Direction.DESCENDING); DataSchema schema = new DataSchema(new String[]{"first", "second"}, new DataSchema.ColumnDataType[]{INT, INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema); + SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2); Mockito.when(_input.nextBlock()) .thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new Object[]{1, 3})) @@ -368,7 +368,7 @@ public class SortOperatorTest { List<RexExpression> collation = collation(0); List<Direction> directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema); + SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2); Mockito.when(_input.nextBlock()) .thenReturn(block(schema, new Object[]{2})) diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java index b89313c25c..52ffff08fb 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java @@ -71,7 +71,7 @@ public class TransformOperatorTest { RexExpression.InputRef ref0 = new RexExpression.InputRef(0); RexExpression.InputRef ref1 = new RexExpression.InputRef(1); TransformOperator op = - new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(ref0, ref1), upStreamSchema); + new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(ref0, ref1), upStreamSchema, 1, 2); TransferableBlock result = op.nextBlock(); Assert.assertTrue(!result.isErrorBlock()); @@ -95,7 +95,8 @@ public class TransformOperatorTest { RexExpression.Literal boolLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true); RexExpression.Literal strLiteral = new RexExpression.Literal(FieldSpec.DataType.STRING, "str"); TransformOperator op = - new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema); + new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema, 1, + 2); TransferableBlock result = op.nextBlock(); // Literal operands should just output original literals. Assert.assertTrue(!result.isErrorBlock()); @@ -125,7 +126,7 @@ public class TransformOperatorTest { DataSchema resultSchema = new DataSchema(new String[]{"plusR", "minusR"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE}); TransformOperator op = - new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(plus01, minus01), upStreamSchema); + new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(plus01, minus01), upStreamSchema, 1, 2); TransferableBlock result = op.nextBlock(); Assert.assertTrue(!result.isErrorBlock()); List<Object[]> resultRows = result.getContainer(); @@ -153,7 +154,7 @@ public class TransformOperatorTest { DataSchema resultSchema = new DataSchema(new String[]{"plusR", "minusR"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE}); TransformOperator op = - new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(plus01, minus01), upStreamSchema); + new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(plus01, minus01), upStreamSchema, 1, 2); TransferableBlock result = op.nextBlock(); Assert.assertTrue(result.isErrorBlock()); @@ -173,7 +174,8 @@ public class TransformOperatorTest { DataSchema resultSchema = new DataSchema(new String[]{"inCol", "strCol"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING}); TransformOperator op = - new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema); + new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema, 1, + 2); TransferableBlock result = op.nextBlock(); Assert.assertTrue(result.isErrorBlock()); DataBlock data = result.getDataBlock(); @@ -196,7 +198,8 @@ public class TransformOperatorTest { DataSchema resultSchema = new DataSchema(new String[]{"boolCol", "strCol"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.STRING}); TransformOperator op = - new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema); + new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema, 1, + 2); TransferableBlock result = op.nextBlock(); // First block has two rows Assert.assertFalse(result.isErrorBlock()); @@ -227,7 +230,8 @@ public class TransformOperatorTest { DataSchema upStreamSchema = new DataSchema(new String[]{"string1", "string2"}, new DataSchema.ColumnDataType[]{ DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING }); - TransformOperator transform = new TransformOperator(_upstreamOp, resultSchema, new ArrayList<>(), upStreamSchema); + TransformOperator transform = + new TransformOperator(_upstreamOp, resultSchema, new ArrayList<>(), upStreamSchema, 1, 2); } @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*doesn't match " @@ -240,6 +244,6 @@ public class TransformOperatorTest { }); RexExpression.InputRef ref0 = new RexExpression.InputRef(0); TransformOperator transform = - new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(ref0), upStreamSchema); + new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(ref0), upStreamSchema, 1, 2); } }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org