Jackie-Jiang commented on code in PR #11205:
URL: https://github.com/apache/pinot/pull/11205#discussion_r1281397920


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -31,151 +33,84 @@
 import org.slf4j.LoggerFactory;
 
 
-/**
- * This class provides the implementation for scheduling multistage queries on 
a single node based
- * on the {@link OpChainScheduler} logic that is passed in. Multistage queries 
support partial execution
- * and will return a NOOP metadata block as a "yield" signal, indicating that 
the next operator
- * chain ({@link OpChainScheduler#next} will be requested.
- */
-@SuppressWarnings("UnstableApiUsage")
-public class OpChainSchedulerService extends AbstractExecutionThreadService {
+public class OpChainSchedulerService implements SchedulerService {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpChainSchedulerService.class);
-  /**
-   * Default time scheduler is allowed to wait for a runnable OpChain to be 
available.
-   */
-  private static final long DEFAULT_SCHEDULER_NEXT_WAIT_MS = 100;
-  /**
-   * Default cancel signal retention, this should be set to several times 
larger than
-   * {@link 
org.apache.pinot.query.service.QueryConfig#DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS}.
-   */
-  private static final long SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS = 
60_000L;
-
-  private final OpChainScheduler _scheduler;
-  private final ExecutorService _workerPool;
-  private final Cache<Long, Long> _cancelledRequests = 
CacheBuilder.newBuilder()
-      .expireAfterWrite(SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS, 
TimeUnit.MILLISECONDS).build();
 
-  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService 
workerPool) {
-    _scheduler = scheduler;
-    _workerPool = workerPool;
-  }
+  private final ExecutorService _executorService;
+  private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap;
 
-  @Override
-  protected void triggerShutdown() {
-    // TODO: Figure out shutdown lifecycle with graceful shutdown in mind.
-    LOGGER.info("Triggered shutdown on OpChainScheduler...");
+  public OpChainSchedulerService(ExecutorService executorService) {
+    _executorService = executorService;
+    _submittedOpChainMap = new ConcurrentHashMap<>();
   }
 
   @Override
-  protected void run()
-      throws Exception {
-    while (isRunning()) {
-      OpChain operatorChain = _scheduler.next(DEFAULT_SCHEDULER_NEXT_WAIT_MS, 
TimeUnit.MILLISECONDS);
-      if (operatorChain == null) {
-        continue;
-      }
-      LOGGER.trace("({}): Scheduling", operatorChain);
-      _workerPool.submit(new TraceRunnable() {
+  public void register(OpChain operatorChain) {
+    Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {
         @Override
         public void runJob() {
           boolean isFinished = false;
-          boolean returnedErrorBlock = false;
+          TransferableBlock returnedErrorBlock = null;
           Throwable thrown = null;
           try {
             LOGGER.trace("({}): Executing", operatorChain);
-            // throw if the operatorChain is cancelled.
-            if 
(_cancelledRequests.asMap().containsKey(operatorChain.getId().getRequestId())) {
-              throw new InterruptedException("Query was cancelled!");
-            }
             operatorChain.getStats().executing();
-            // so long as there's work to be done, keep getting the next block
-            // when the operator chain returns a NOOP block, then yield the 
execution
-            // of this to another worker
             TransferableBlock result = operatorChain.getRoot().nextBlock();
-            while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+            while (!result.isEndOfStreamBlock()) {
               result = operatorChain.getRoot().nextBlock();
             }
-
-            if (result.isNoOpBlock()) {
-              // TODO: There should be a waiting-for-data state in 
OpChainStats.
-              operatorChain.getStats().queued();
-              _scheduler.yield(operatorChain);
+            isFinished = true;
+            if (result.isErrorBlock()) {
+              returnedErrorBlock = result;
+              LOGGER.error("({}): Completed erroneously {} {}", operatorChain, 
operatorChain.getStats(),
+                  result.getDataBlock().getExceptions());
             } else {
-              isFinished = true;
-              if (result.isErrorBlock()) {
-                returnedErrorBlock = true;
-                LOGGER.error("({}): Completed erroneously {} {}", 
operatorChain, operatorChain.getStats(),
-                    result.getDataBlock().getExceptions());
-              } else {
-                LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
-              }
+              LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
             }
           } catch (Exception e) {
             LOGGER.error("({}): Failed to execute operator chain! {}", 
operatorChain, operatorChain.getStats(), e);
             thrown = e;
           } finally {
-            if (returnedErrorBlock || thrown != null) {
+            if (returnedErrorBlock != null || thrown != null) {
+              if (thrown == null) {
+                String blockMsg;
+                if (returnedErrorBlock.getDataBlock() instanceof 
MetadataBlock) {
+                  MetadataBlock metadataBlock = (MetadataBlock) 
returnedErrorBlock.getDataBlock();
+                  blockMsg = String.join(", ", 
metadataBlock.getExceptions().values());
+                } else {
+                  blockMsg = "Unknown";
+                }
+                thrown = new RuntimeException("Error block " + blockMsg);
+              }
               cancelOpChain(operatorChain, thrown);
             } else if (isFinished) {
               closeOpChain(operatorChain);
             }
           }
         }
       });
-    }
+    _submittedOpChainMap.put(operatorChain.getId(), scheduledFuture);

Review Comment:
   We need to remove the entry when closing/cancelling the `OpChain`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -31,151 +33,84 @@
 import org.slf4j.LoggerFactory;
 
 
-/**
- * This class provides the implementation for scheduling multistage queries on 
a single node based
- * on the {@link OpChainScheduler} logic that is passed in. Multistage queries 
support partial execution
- * and will return a NOOP metadata block as a "yield" signal, indicating that 
the next operator
- * chain ({@link OpChainScheduler#next} will be requested.
- */
-@SuppressWarnings("UnstableApiUsage")
-public class OpChainSchedulerService extends AbstractExecutionThreadService {
+public class OpChainSchedulerService implements SchedulerService {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpChainSchedulerService.class);
-  /**
-   * Default time scheduler is allowed to wait for a runnable OpChain to be 
available.
-   */
-  private static final long DEFAULT_SCHEDULER_NEXT_WAIT_MS = 100;
-  /**
-   * Default cancel signal retention, this should be set to several times 
larger than
-   * {@link 
org.apache.pinot.query.service.QueryConfig#DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS}.
-   */
-  private static final long SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS = 
60_000L;
-
-  private final OpChainScheduler _scheduler;
-  private final ExecutorService _workerPool;
-  private final Cache<Long, Long> _cancelledRequests = 
CacheBuilder.newBuilder()
-      .expireAfterWrite(SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS, 
TimeUnit.MILLISECONDS).build();
 
-  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService 
workerPool) {
-    _scheduler = scheduler;
-    _workerPool = workerPool;
-  }
+  private final ExecutorService _executorService;
+  private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap;
 
-  @Override
-  protected void triggerShutdown() {
-    // TODO: Figure out shutdown lifecycle with graceful shutdown in mind.
-    LOGGER.info("Triggered shutdown on OpChainScheduler...");
+  public OpChainSchedulerService(ExecutorService executorService) {
+    _executorService = executorService;
+    _submittedOpChainMap = new ConcurrentHashMap<>();
   }
 
   @Override
-  protected void run()
-      throws Exception {
-    while (isRunning()) {
-      OpChain operatorChain = _scheduler.next(DEFAULT_SCHEDULER_NEXT_WAIT_MS, 
TimeUnit.MILLISECONDS);
-      if (operatorChain == null) {
-        continue;
-      }
-      LOGGER.trace("({}): Scheduling", operatorChain);
-      _workerPool.submit(new TraceRunnable() {
+  public void register(OpChain operatorChain) {
+    Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {
         @Override
         public void runJob() {
           boolean isFinished = false;
-          boolean returnedErrorBlock = false;
+          TransferableBlock returnedErrorBlock = null;
           Throwable thrown = null;
           try {
             LOGGER.trace("({}): Executing", operatorChain);
-            // throw if the operatorChain is cancelled.
-            if 
(_cancelledRequests.asMap().containsKey(operatorChain.getId().getRequestId())) {
-              throw new InterruptedException("Query was cancelled!");
-            }
             operatorChain.getStats().executing();
-            // so long as there's work to be done, keep getting the next block
-            // when the operator chain returns a NOOP block, then yield the 
execution
-            // of this to another worker
             TransferableBlock result = operatorChain.getRoot().nextBlock();
-            while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+            while (!result.isEndOfStreamBlock()) {
               result = operatorChain.getRoot().nextBlock();
             }
-
-            if (result.isNoOpBlock()) {
-              // TODO: There should be a waiting-for-data state in 
OpChainStats.
-              operatorChain.getStats().queued();
-              _scheduler.yield(operatorChain);
+            isFinished = true;
+            if (result.isErrorBlock()) {
+              returnedErrorBlock = result;
+              LOGGER.error("({}): Completed erroneously {} {}", operatorChain, 
operatorChain.getStats(),
+                  result.getDataBlock().getExceptions());
             } else {
-              isFinished = true;
-              if (result.isErrorBlock()) {
-                returnedErrorBlock = true;
-                LOGGER.error("({}): Completed erroneously {} {}", 
operatorChain, operatorChain.getStats(),
-                    result.getDataBlock().getExceptions());
-              } else {
-                LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
-              }
+              LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
             }
           } catch (Exception e) {
             LOGGER.error("({}): Failed to execute operator chain! {}", 
operatorChain, operatorChain.getStats(), e);
             thrown = e;
           } finally {
-            if (returnedErrorBlock || thrown != null) {
+            if (returnedErrorBlock != null || thrown != null) {
+              if (thrown == null) {
+                String blockMsg;
+                if (returnedErrorBlock.getDataBlock() instanceof 
MetadataBlock) {
+                  MetadataBlock metadataBlock = (MetadataBlock) 
returnedErrorBlock.getDataBlock();
+                  blockMsg = String.join(", ", 
metadataBlock.getExceptions().values());
+                } else {
+                  blockMsg = "Unknown";
+                }
+                thrown = new RuntimeException("Error block " + blockMsg);
+              }
               cancelOpChain(operatorChain, thrown);
             } else if (isFinished) {
               closeOpChain(operatorChain);
             }
           }
         }
       });
-    }
+    _submittedOpChainMap.put(operatorChain.getId(), scheduledFuture);
   }
 
-  /**
-   * Register a new operator chain with the scheduler.
-   *
-   * @param operatorChain the chain to register
-   */
-  public final void register(OpChain operatorChain) {
-    operatorChain.getStats().queued();
-    _scheduler.register(operatorChain);
-    LOGGER.debug("({}): Scheduler is now handling operator chain listening to 
mailboxes {}. "
-            + "There are a total of {} chains awaiting execution.", 
operatorChain,
-        operatorChain.getReceivingMailboxIds(),
-        _scheduler.size());
-  }
-
-  /**
-   * Async cancel a request. Request will not be fully cancelled until the 
next time opChain is being polled.
-   *
-   * @param requestId requestId to be cancelled.
-   */
-  public final void cancel(long requestId) {
-    _cancelledRequests.put(requestId, requestId);
-  }
-
-  /**
-   * This method should be called whenever data is available for an {@link 
OpChain} to consume.
-   * Implementations of this method should be idempotent, it may be called in 
the scenario that no data is available.
-   *
-   * @param opChainId the identifier of the operator chain
-   */
-  public final void onDataAvailable(OpChainId opChainId) {
-    _scheduler.onDataAvailable(opChainId);
-  }
-
-  // TODO: remove this method after we pipe down the proper executor pool to 
the v1 engine
-  public ExecutorService getWorkerPool() {
-    return _workerPool;
+  @Override
+  public void cancel(long requestId) {
+    // simple cancellation. for leaf stage this cannot be a dangling opchain 
b/c they will eventually be cleared up
+    // via query timeout.
+    List<OpChainId> opChainIdsToCancel = _submittedOpChainMap.keySet()

Review Comment:
   Use `entrySet().iterator()` to iterate should be cheaper and create less 
garbage. We also need to remove the entry because there is no guarantee that 
the future already starts executing



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -31,151 +33,84 @@
 import org.slf4j.LoggerFactory;
 
 
-/**
- * This class provides the implementation for scheduling multistage queries on 
a single node based
- * on the {@link OpChainScheduler} logic that is passed in. Multistage queries 
support partial execution
- * and will return a NOOP metadata block as a "yield" signal, indicating that 
the next operator
- * chain ({@link OpChainScheduler#next} will be requested.
- */
-@SuppressWarnings("UnstableApiUsage")
-public class OpChainSchedulerService extends AbstractExecutionThreadService {
+public class OpChainSchedulerService implements SchedulerService {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpChainSchedulerService.class);
-  /**
-   * Default time scheduler is allowed to wait for a runnable OpChain to be 
available.
-   */
-  private static final long DEFAULT_SCHEDULER_NEXT_WAIT_MS = 100;
-  /**
-   * Default cancel signal retention, this should be set to several times 
larger than
-   * {@link 
org.apache.pinot.query.service.QueryConfig#DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS}.
-   */
-  private static final long SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS = 
60_000L;
-
-  private final OpChainScheduler _scheduler;
-  private final ExecutorService _workerPool;
-  private final Cache<Long, Long> _cancelledRequests = 
CacheBuilder.newBuilder()
-      .expireAfterWrite(SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS, 
TimeUnit.MILLISECONDS).build();
 
-  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService 
workerPool) {
-    _scheduler = scheduler;
-    _workerPool = workerPool;
-  }
+  private final ExecutorService _executorService;
+  private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap;
 
-  @Override
-  protected void triggerShutdown() {
-    // TODO: Figure out shutdown lifecycle with graceful shutdown in mind.
-    LOGGER.info("Triggered shutdown on OpChainScheduler...");
+  public OpChainSchedulerService(ExecutorService executorService) {
+    _executorService = executorService;
+    _submittedOpChainMap = new ConcurrentHashMap<>();
   }
 
   @Override
-  protected void run()
-      throws Exception {
-    while (isRunning()) {
-      OpChain operatorChain = _scheduler.next(DEFAULT_SCHEDULER_NEXT_WAIT_MS, 
TimeUnit.MILLISECONDS);
-      if (operatorChain == null) {
-        continue;
-      }
-      LOGGER.trace("({}): Scheduling", operatorChain);
-      _workerPool.submit(new TraceRunnable() {
+  public void register(OpChain operatorChain) {
+    Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {

Review Comment:
   (format) The indentation in this file is incorrect



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -31,151 +33,84 @@
 import org.slf4j.LoggerFactory;
 
 
-/**
- * This class provides the implementation for scheduling multistage queries on 
a single node based
- * on the {@link OpChainScheduler} logic that is passed in. Multistage queries 
support partial execution
- * and will return a NOOP metadata block as a "yield" signal, indicating that 
the next operator
- * chain ({@link OpChainScheduler#next} will be requested.
- */
-@SuppressWarnings("UnstableApiUsage")
-public class OpChainSchedulerService extends AbstractExecutionThreadService {
+public class OpChainSchedulerService implements SchedulerService {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpChainSchedulerService.class);
-  /**
-   * Default time scheduler is allowed to wait for a runnable OpChain to be 
available.
-   */
-  private static final long DEFAULT_SCHEDULER_NEXT_WAIT_MS = 100;
-  /**
-   * Default cancel signal retention, this should be set to several times 
larger than
-   * {@link 
org.apache.pinot.query.service.QueryConfig#DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS}.
-   */
-  private static final long SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS = 
60_000L;
-
-  private final OpChainScheduler _scheduler;
-  private final ExecutorService _workerPool;
-  private final Cache<Long, Long> _cancelledRequests = 
CacheBuilder.newBuilder()
-      .expireAfterWrite(SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS, 
TimeUnit.MILLISECONDS).build();
 
-  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService 
workerPool) {
-    _scheduler = scheduler;
-    _workerPool = workerPool;
-  }
+  private final ExecutorService _executorService;
+  private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap;
 
-  @Override
-  protected void triggerShutdown() {
-    // TODO: Figure out shutdown lifecycle with graceful shutdown in mind.
-    LOGGER.info("Triggered shutdown on OpChainScheduler...");
+  public OpChainSchedulerService(ExecutorService executorService) {
+    _executorService = executorService;
+    _submittedOpChainMap = new ConcurrentHashMap<>();
   }
 
   @Override
-  protected void run()
-      throws Exception {
-    while (isRunning()) {
-      OpChain operatorChain = _scheduler.next(DEFAULT_SCHEDULER_NEXT_WAIT_MS, 
TimeUnit.MILLISECONDS);
-      if (operatorChain == null) {
-        continue;
-      }
-      LOGGER.trace("({}): Scheduling", operatorChain);
-      _workerPool.submit(new TraceRunnable() {
+  public void register(OpChain operatorChain) {
+    Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {
         @Override
         public void runJob() {
           boolean isFinished = false;
-          boolean returnedErrorBlock = false;
+          TransferableBlock returnedErrorBlock = null;
           Throwable thrown = null;
           try {
             LOGGER.trace("({}): Executing", operatorChain);
-            // throw if the operatorChain is cancelled.
-            if 
(_cancelledRequests.asMap().containsKey(operatorChain.getId().getRequestId())) {
-              throw new InterruptedException("Query was cancelled!");
-            }
             operatorChain.getStats().executing();
-            // so long as there's work to be done, keep getting the next block
-            // when the operator chain returns a NOOP block, then yield the 
execution
-            // of this to another worker
             TransferableBlock result = operatorChain.getRoot().nextBlock();
-            while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+            while (!result.isEndOfStreamBlock()) {
               result = operatorChain.getRoot().nextBlock();
             }
-
-            if (result.isNoOpBlock()) {
-              // TODO: There should be a waiting-for-data state in 
OpChainStats.
-              operatorChain.getStats().queued();
-              _scheduler.yield(operatorChain);
+            isFinished = true;
+            if (result.isErrorBlock()) {
+              returnedErrorBlock = result;
+              LOGGER.error("({}): Completed erroneously {} {}", operatorChain, 
operatorChain.getStats(),
+                  result.getDataBlock().getExceptions());
             } else {
-              isFinished = true;
-              if (result.isErrorBlock()) {
-                returnedErrorBlock = true;
-                LOGGER.error("({}): Completed erroneously {} {}", 
operatorChain, operatorChain.getStats(),
-                    result.getDataBlock().getExceptions());
-              } else {
-                LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
-              }
+              LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
             }
           } catch (Exception e) {
             LOGGER.error("({}): Failed to execute operator chain! {}", 
operatorChain, operatorChain.getStats(), e);
             thrown = e;
           } finally {
-            if (returnedErrorBlock || thrown != null) {
+            if (returnedErrorBlock != null || thrown != null) {
+              if (thrown == null) {
+                String blockMsg;
+                if (returnedErrorBlock.getDataBlock() instanceof 
MetadataBlock) {
+                  MetadataBlock metadataBlock = (MetadataBlock) 
returnedErrorBlock.getDataBlock();
+                  blockMsg = String.join(", ", 
metadataBlock.getExceptions().values());
+                } else {
+                  blockMsg = "Unknown";
+                }
+                thrown = new RuntimeException("Error block " + blockMsg);
+              }
               cancelOpChain(operatorChain, thrown);
             } else if (isFinished) {
               closeOpChain(operatorChain);
             }
           }
         }
       });
-    }
+    _submittedOpChainMap.put(operatorChain.getId(), scheduledFuture);
   }
 
-  /**
-   * Register a new operator chain with the scheduler.
-   *
-   * @param operatorChain the chain to register
-   */
-  public final void register(OpChain operatorChain) {
-    operatorChain.getStats().queued();
-    _scheduler.register(operatorChain);
-    LOGGER.debug("({}): Scheduler is now handling operator chain listening to 
mailboxes {}. "
-            + "There are a total of {} chains awaiting execution.", 
operatorChain,
-        operatorChain.getReceivingMailboxIds(),
-        _scheduler.size());
-  }
-
-  /**
-   * Async cancel a request. Request will not be fully cancelled until the 
next time opChain is being polled.
-   *
-   * @param requestId requestId to be cancelled.
-   */
-  public final void cancel(long requestId) {
-    _cancelledRequests.put(requestId, requestId);
-  }
-
-  /**
-   * This method should be called whenever data is available for an {@link 
OpChain} to consume.
-   * Implementations of this method should be idempotent, it may be called in 
the scenario that no data is available.
-   *
-   * @param opChainId the identifier of the operator chain
-   */
-  public final void onDataAvailable(OpChainId opChainId) {
-    _scheduler.onDataAvailable(opChainId);
-  }
-
-  // TODO: remove this method after we pipe down the proper executor pool to 
the v1 engine
-  public ExecutorService getWorkerPool() {
-    return _workerPool;
+  @Override
+  public void cancel(long requestId) {

Review Comment:
   We probably want a different API to directly cancel an `OpChain` (or 
`OpChainId`) because that is much cheaper than cancelling a request id



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -31,151 +33,84 @@
 import org.slf4j.LoggerFactory;
 
 
-/**
- * This class provides the implementation for scheduling multistage queries on 
a single node based
- * on the {@link OpChainScheduler} logic that is passed in. Multistage queries 
support partial execution
- * and will return a NOOP metadata block as a "yield" signal, indicating that 
the next operator
- * chain ({@link OpChainScheduler#next} will be requested.
- */
-@SuppressWarnings("UnstableApiUsage")
-public class OpChainSchedulerService extends AbstractExecutionThreadService {
+public class OpChainSchedulerService implements SchedulerService {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpChainSchedulerService.class);
-  /**
-   * Default time scheduler is allowed to wait for a runnable OpChain to be 
available.
-   */
-  private static final long DEFAULT_SCHEDULER_NEXT_WAIT_MS = 100;
-  /**
-   * Default cancel signal retention, this should be set to several times 
larger than
-   * {@link 
org.apache.pinot.query.service.QueryConfig#DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS}.
-   */
-  private static final long SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS = 
60_000L;
-
-  private final OpChainScheduler _scheduler;
-  private final ExecutorService _workerPool;
-  private final Cache<Long, Long> _cancelledRequests = 
CacheBuilder.newBuilder()
-      .expireAfterWrite(SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS, 
TimeUnit.MILLISECONDS).build();
 
-  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService 
workerPool) {
-    _scheduler = scheduler;
-    _workerPool = workerPool;
-  }
+  private final ExecutorService _executorService;
+  private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap;
 
-  @Override
-  protected void triggerShutdown() {
-    // TODO: Figure out shutdown lifecycle with graceful shutdown in mind.
-    LOGGER.info("Triggered shutdown on OpChainScheduler...");
+  public OpChainSchedulerService(ExecutorService executorService) {
+    _executorService = executorService;
+    _submittedOpChainMap = new ConcurrentHashMap<>();
   }
 
   @Override
-  protected void run()
-      throws Exception {
-    while (isRunning()) {
-      OpChain operatorChain = _scheduler.next(DEFAULT_SCHEDULER_NEXT_WAIT_MS, 
TimeUnit.MILLISECONDS);
-      if (operatorChain == null) {
-        continue;
-      }
-      LOGGER.trace("({}): Scheduling", operatorChain);
-      _workerPool.submit(new TraceRunnable() {
+  public void register(OpChain operatorChain) {
+    Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {
         @Override
         public void runJob() {
           boolean isFinished = false;
-          boolean returnedErrorBlock = false;
+          TransferableBlock returnedErrorBlock = null;
           Throwable thrown = null;
           try {
             LOGGER.trace("({}): Executing", operatorChain);
-            // throw if the operatorChain is cancelled.
-            if 
(_cancelledRequests.asMap().containsKey(operatorChain.getId().getRequestId())) {
-              throw new InterruptedException("Query was cancelled!");
-            }
             operatorChain.getStats().executing();
-            // so long as there's work to be done, keep getting the next block
-            // when the operator chain returns a NOOP block, then yield the 
execution
-            // of this to another worker
             TransferableBlock result = operatorChain.getRoot().nextBlock();
-            while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+            while (!result.isEndOfStreamBlock()) {
               result = operatorChain.getRoot().nextBlock();
             }
-
-            if (result.isNoOpBlock()) {
-              // TODO: There should be a waiting-for-data state in 
OpChainStats.
-              operatorChain.getStats().queued();
-              _scheduler.yield(operatorChain);
+            isFinished = true;
+            if (result.isErrorBlock()) {
+              returnedErrorBlock = result;
+              LOGGER.error("({}): Completed erroneously {} {}", operatorChain, 
operatorChain.getStats(),
+                  result.getDataBlock().getExceptions());
             } else {
-              isFinished = true;
-              if (result.isErrorBlock()) {
-                returnedErrorBlock = true;
-                LOGGER.error("({}): Completed erroneously {} {}", 
operatorChain, operatorChain.getStats(),
-                    result.getDataBlock().getExceptions());
-              } else {
-                LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
-              }
+              LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
             }
           } catch (Exception e) {
             LOGGER.error("({}): Failed to execute operator chain! {}", 
operatorChain, operatorChain.getStats(), e);
             thrown = e;
           } finally {
-            if (returnedErrorBlock || thrown != null) {
+            if (returnedErrorBlock != null || thrown != null) {
+              if (thrown == null) {
+                String blockMsg;
+                if (returnedErrorBlock.getDataBlock() instanceof 
MetadataBlock) {
+                  MetadataBlock metadataBlock = (MetadataBlock) 
returnedErrorBlock.getDataBlock();
+                  blockMsg = String.join(", ", 
metadataBlock.getExceptions().values());
+                } else {
+                  blockMsg = "Unknown";
+                }
+                thrown = new RuntimeException("Error block " + blockMsg);

Review Comment:
   ```suggestion
                   thrown = new RuntimeException("Error block: " + 
returnedErrorBlock.getDataBlock().getExceptions());
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java:
##########
@@ -115,12 +115,12 @@ public MultiStageBrokerRequestHandler(PinotConfiguration 
config, String brokerId
 
     long releaseMs = 
config.getProperty(QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS,
         QueryConfig.DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS);
-    _reducerScheduler = new OpChainSchedulerService(new 
RoundRobinScheduler(releaseMs),
-        Executors.newCachedThreadPool(new 
NamedThreadFactory("query_broker_reducer_" + _reducerPort + "_port")));
-    _mailboxService = new MailboxService(_reducerHostname, _reducerPort, 
config, _reducerScheduler::onDataAvailable);
+    //TODO: make this configurable
+    _opChainExecutor = new OpChainExecutor(new 
NamedThreadFactory("op_chain_worker_on_" + _reducerPort + "_port"));

Review Comment:
   It is not closed in `shutDown()`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/SchedulerService.java:
##########
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import org.apache.pinot.query.runtime.operator.OpChain;
+
+
+public interface SchedulerService {

Review Comment:
   +1



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/OpChainExecutor.java:
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class OpChainExecutor implements ExecutorService, AutoCloseable {

Review Comment:
   I don't really see much value for this wrapper class. Suggest removing it 
and directly creating an `ExecutorService`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -31,151 +33,84 @@
 import org.slf4j.LoggerFactory;
 
 
-/**
- * This class provides the implementation for scheduling multistage queries on 
a single node based
- * on the {@link OpChainScheduler} logic that is passed in. Multistage queries 
support partial execution
- * and will return a NOOP metadata block as a "yield" signal, indicating that 
the next operator
- * chain ({@link OpChainScheduler#next} will be requested.
- */
-@SuppressWarnings("UnstableApiUsage")
-public class OpChainSchedulerService extends AbstractExecutionThreadService {
+public class OpChainSchedulerService implements SchedulerService {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpChainSchedulerService.class);
-  /**
-   * Default time scheduler is allowed to wait for a runnable OpChain to be 
available.
-   */
-  private static final long DEFAULT_SCHEDULER_NEXT_WAIT_MS = 100;
-  /**
-   * Default cancel signal retention, this should be set to several times 
larger than
-   * {@link 
org.apache.pinot.query.service.QueryConfig#DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS}.
-   */
-  private static final long SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS = 
60_000L;
-
-  private final OpChainScheduler _scheduler;
-  private final ExecutorService _workerPool;
-  private final Cache<Long, Long> _cancelledRequests = 
CacheBuilder.newBuilder()
-      .expireAfterWrite(SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS, 
TimeUnit.MILLISECONDS).build();
 
-  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService 
workerPool) {
-    _scheduler = scheduler;
-    _workerPool = workerPool;
-  }
+  private final ExecutorService _executorService;
+  private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap;
 
-  @Override
-  protected void triggerShutdown() {
-    // TODO: Figure out shutdown lifecycle with graceful shutdown in mind.
-    LOGGER.info("Triggered shutdown on OpChainScheduler...");
+  public OpChainSchedulerService(ExecutorService executorService) {
+    _executorService = executorService;
+    _submittedOpChainMap = new ConcurrentHashMap<>();
   }
 
   @Override
-  protected void run()
-      throws Exception {
-    while (isRunning()) {
-      OpChain operatorChain = _scheduler.next(DEFAULT_SCHEDULER_NEXT_WAIT_MS, 
TimeUnit.MILLISECONDS);
-      if (operatorChain == null) {
-        continue;
-      }
-      LOGGER.trace("({}): Scheduling", operatorChain);
-      _workerPool.submit(new TraceRunnable() {
+  public void register(OpChain operatorChain) {
+    Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {
         @Override
         public void runJob() {
           boolean isFinished = false;
-          boolean returnedErrorBlock = false;
+          TransferableBlock returnedErrorBlock = null;
           Throwable thrown = null;
           try {
             LOGGER.trace("({}): Executing", operatorChain);
-            // throw if the operatorChain is cancelled.
-            if 
(_cancelledRequests.asMap().containsKey(operatorChain.getId().getRequestId())) {
-              throw new InterruptedException("Query was cancelled!");
-            }
             operatorChain.getStats().executing();
-            // so long as there's work to be done, keep getting the next block
-            // when the operator chain returns a NOOP block, then yield the 
execution
-            // of this to another worker
             TransferableBlock result = operatorChain.getRoot().nextBlock();
-            while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+            while (!result.isEndOfStreamBlock()) {
               result = operatorChain.getRoot().nextBlock();
             }
-
-            if (result.isNoOpBlock()) {
-              // TODO: There should be a waiting-for-data state in 
OpChainStats.
-              operatorChain.getStats().queued();
-              _scheduler.yield(operatorChain);
+            isFinished = true;
+            if (result.isErrorBlock()) {
+              returnedErrorBlock = result;
+              LOGGER.error("({}): Completed erroneously {} {}", operatorChain, 
operatorChain.getStats(),
+                  result.getDataBlock().getExceptions());
             } else {
-              isFinished = true;
-              if (result.isErrorBlock()) {
-                returnedErrorBlock = true;
-                LOGGER.error("({}): Completed erroneously {} {}", 
operatorChain, operatorChain.getStats(),
-                    result.getDataBlock().getExceptions());
-              } else {
-                LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
-              }
+              LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
             }
           } catch (Exception e) {
             LOGGER.error("({}): Failed to execute operator chain! {}", 
operatorChain, operatorChain.getStats(), e);
             thrown = e;
           } finally {
-            if (returnedErrorBlock || thrown != null) {
+            if (returnedErrorBlock != null || thrown != null) {
+              if (thrown == null) {
+                String blockMsg;
+                if (returnedErrorBlock.getDataBlock() instanceof 
MetadataBlock) {
+                  MetadataBlock metadataBlock = (MetadataBlock) 
returnedErrorBlock.getDataBlock();
+                  blockMsg = String.join(", ", 
metadataBlock.getExceptions().values());
+                } else {
+                  blockMsg = "Unknown";
+                }
+                thrown = new RuntimeException("Error block " + blockMsg);
+              }
               cancelOpChain(operatorChain, thrown);
             } else if (isFinished) {
               closeOpChain(operatorChain);
             }
           }
         }
       });
-    }
+    _submittedOpChainMap.put(operatorChain.getId(), scheduledFuture);
   }
 
-  /**
-   * Register a new operator chain with the scheduler.
-   *
-   * @param operatorChain the chain to register
-   */
-  public final void register(OpChain operatorChain) {
-    operatorChain.getStats().queued();
-    _scheduler.register(operatorChain);
-    LOGGER.debug("({}): Scheduler is now handling operator chain listening to 
mailboxes {}. "
-            + "There are a total of {} chains awaiting execution.", 
operatorChain,
-        operatorChain.getReceivingMailboxIds(),
-        _scheduler.size());
-  }
-
-  /**
-   * Async cancel a request. Request will not be fully cancelled until the 
next time opChain is being polled.
-   *
-   * @param requestId requestId to be cancelled.
-   */
-  public final void cancel(long requestId) {
-    _cancelledRequests.put(requestId, requestId);
-  }
-
-  /**
-   * This method should be called whenever data is available for an {@link 
OpChain} to consume.
-   * Implementations of this method should be idempotent, it may be called in 
the scenario that no data is available.
-   *
-   * @param opChainId the identifier of the operator chain
-   */
-  public final void onDataAvailable(OpChainId opChainId) {
-    _scheduler.onDataAvailable(opChainId);
-  }
-
-  // TODO: remove this method after we pipe down the proper executor pool to 
the v1 engine
-  public ExecutorService getWorkerPool() {
-    return _workerPool;
+  @Override
+  public void cancel(long requestId) {
+    // simple cancellation. for leaf stage this cannot be a dangling opchain 
b/c they will eventually be cleared up
+    // via query timeout.
+    List<OpChainId> opChainIdsToCancel = _submittedOpChainMap.keySet()
+        .stream().filter(opChainId -> opChainId.getRequestId() == 
requestId).collect(Collectors.toList());
+    for (OpChainId opChainId : opChainIdsToCancel) {
+      Future<?> future = _submittedOpChainMap.get(opChainId);
+      if (future != null) {
+        future.cancel(true);
+      }
+    }
   }
 
   private void closeOpChain(OpChain opChain) {
-    try {
       opChain.close();
-    } finally {
-      _scheduler.deregister(opChain);
-    }
   }
 
-  private void cancelOpChain(OpChain opChain, Throwable e) {
-    try {
-      opChain.cancel(e);
-    } finally {
-      _scheduler.deregister(opChain);
-    }
+  private void cancelOpChain(OpChain opChain, @Nullable Throwable t) {

Review Comment:
   With the null handling above, `t` will no longer be `null`, and we can 
remove the annotation and special null handling in all the subsequent classes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to