gortiz commented on code in PR #12704: URL: https://github.com/apache/pinot/pull/12704#discussion_r1580748812
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java: ########## @@ -19,107 +19,295 @@ package org.apache.pinot.query.runtime.operator; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import java.io.DataInput; +import java.io.IOException; import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.datatable.StatMap; +import org.apache.pinot.common.response.broker.BrokerResponseNativeV2; import org.apache.pinot.core.common.Operator; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.query.runtime.plan.MultiStageQueryStats; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; +import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerOperator; import org.apache.pinot.spi.exception.EarlyTerminationException; import org.apache.pinot.spi.trace.InvocationScope; import org.apache.pinot.spi.trace.Tracing; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public abstract class MultiStageOperator implements Operator<TransferableBlock>, AutoCloseable { - private static final Logger LOGGER = LoggerFactory.getLogger(MultiStageOperator.class); +public abstract class MultiStageOperator<K extends Enum<K> & StatMap.Key> + implements Operator<TransferableBlock>, AutoCloseable { protected final OpChainExecutionContext _context; protected final String _operatorId; - protected final OpChainStats _opChainStats; + protected final StatMap<K> _statMap; protected boolean _isEarlyTerminated; - public MultiStageOperator(OpChainExecutionContext context) { + public MultiStageOperator(OpChainExecutionContext context, Class<K> keyStatClass) { _context = context; _operatorId = Joiner.on("_").join(getClass().getSimpleName(), _context.getStageId(), _context.getServer()); - _opChainStats = _context.getStats(); _isEarlyTerminated = false; + _statMap = new StatMap<>(keyStatClass); } + protected abstract Logger logger(); + + public abstract Type getOperatorType(); + + public abstract K getExecutionTimeKey(); + + public abstract K getEmittedRowsKey(); + @Override public TransferableBlock nextBlock() { if (Tracing.ThreadAccountantOps.isInterrupted()) { throw new EarlyTerminationException("Interrupted while processing next block"); } + if (logger().isDebugEnabled()) { + logger().debug("Operator {}: Reading next block", _operatorId); + } try (InvocationScope ignored = Tracing.getTracer().createScope(getClass())) { TransferableBlock nextBlock; if (shouldCollectStats()) { - OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, _operatorId); - operatorStats.startTimer(); + Stopwatch executeStopwatch = Stopwatch.createStarted(); Review Comment: I've finally decided to always collect stats because this was difficult to explain and the cost is negligible -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org