walterddr commented on code in PR #10094: URL: https://github.com/apache/pinot/pull/10094#discussion_r1068799677
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java: ########## @@ -146,11 +146,6 @@ public final void register(OpChain operatorChain) { operatorChain, operatorChain.getReceivingMailbox(), _scheduler.size()); - - // we want to track the time that it takes from registering - // an operator chain to when it completes, so make sure to - // start the timer here - operatorChain.getStats().startExecutionTimer(); Review Comment: changing this will only record the execution time but not the first register-to-executing delay yes? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java: ########## @@ -104,6 +113,7 @@ public AggregateOperator(Operator<TransferableBlock> inputOperator, DataSchema d _resultSchema = dataSchema; _readyToConstruct = false; _hasReturnedAggregateBlock = false; + _operatorStats = new OperatorStats(context.getRequestId(), context.getStageId(), EXPLAIN_NAME); Review Comment: in case we have subclasses going forward, let's call the explainString method b/c EXPLAIN_NAME can be overwritten ```suggestion _operatorStats = new OperatorStats(context.getRequestId(), context.getStageId(), toExplainString()); ``` ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java: ########## @@ -69,18 +74,22 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> { private boolean _readyToConstruct; private boolean _hasReturnedAggregateBlock; + // TODO: Move to OperatorContext class. + private OperatorStats _operatorStats; + // TODO: refactor Pinot Reducer code to support the intermediate stage agg operator. // aggCalls has to be a list of FunctionCall and cannot be null // groupSet has to be a list of InputRef and cannot be null // TODO: Add these two checks when we confirm we can handle error in upstream ctor call. public AggregateOperator(Operator<TransferableBlock> inputOperator, DataSchema dataSchema, - List<RexExpression> aggCalls, List<RexExpression> groupSet) { - this(inputOperator, dataSchema, aggCalls, groupSet, AggregateOperator.Accumulator.MERGERS); + List<RexExpression> aggCalls, List<RexExpression> groupSet, PlanRequestContext context) { Review Comment: can we create the OperatorStats at the PhysicalPlanVisitor or passing in the requestID and stageId directly? is there any other info needed from the context? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java: ########## @@ -160,8 +155,8 @@ public final void register(OpChain operatorChain, boolean isNew) { operatorChain, isNew, _scheduler.size()); _scheduler.register(operatorChain, isNew); - operatorChain.getStats().queued(); } finally { + operatorChain.getStats().queued(); Review Comment: can we add a test for this and the change above? I wasn't sure i fully understand why this queued call needs to be moved into the finally block ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java: ########## @@ -120,6 +130,7 @@ public String toExplainString() { @Override protected TransferableBlock getNextBlock() { + _operatorStats.startTimer(); Review Comment: so IIUC, this operatorStats is actually operator chain stats, b/c it doesn't stop this timer when calling the downstream's nextBlock() method, yes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org