gortiz commented on code in PR #12704:
URL: https://github.com/apache/pinot/pull/12704#discussion_r1580748812


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java:
##########
@@ -19,107 +19,295 @@
 package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import java.io.DataInput;
+import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerOperator;
 import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.apache.pinot.spi.trace.InvocationScope;
 import org.apache.pinot.spi.trace.Tracing;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
-public abstract class MultiStageOperator implements 
Operator<TransferableBlock>, AutoCloseable {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiStageOperator.class);
+public abstract class MultiStageOperator<K extends Enum<K> & StatMap.Key>
+    implements Operator<TransferableBlock>, AutoCloseable {
 
   protected final OpChainExecutionContext _context;
   protected final String _operatorId;
-  protected final OpChainStats _opChainStats;
+  protected final StatMap<K> _statMap;
   protected boolean _isEarlyTerminated;
 
-  public MultiStageOperator(OpChainExecutionContext context) {
+  public MultiStageOperator(OpChainExecutionContext context, Class<K> 
keyStatClass) {
     _context = context;
     _operatorId = Joiner.on("_").join(getClass().getSimpleName(), 
_context.getStageId(), _context.getServer());
-    _opChainStats = _context.getStats();
     _isEarlyTerminated = false;
+    _statMap = new StatMap<>(keyStatClass);
   }
 
+  protected abstract Logger logger();
+
+  public abstract Type getOperatorType();
+
+  public abstract K getExecutionTimeKey();
+
+  public abstract K getEmittedRowsKey();
+
   @Override
   public TransferableBlock nextBlock() {
     if (Tracing.ThreadAccountantOps.isInterrupted()) {
       throw new EarlyTerminationException("Interrupted while processing next 
block");
     }
+    if (logger().isDebugEnabled()) {
+      logger().debug("Operator {}: Reading next block", _operatorId);
+    }
     try (InvocationScope ignored = 
Tracing.getTracer().createScope(getClass())) {
       TransferableBlock nextBlock;
       if (shouldCollectStats()) {
-        OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, 
_operatorId);
-        operatorStats.startTimer();
+        Stopwatch executeStopwatch = Stopwatch.createStarted();

Review Comment:
   I've finally decided to always collect stats because this was difficult to 
explain and the cost is negligible



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to