gortiz commented on code in PR #11205:
URL: https://github.com/apache/pinot/pull/11205#discussion_r1281500827


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -31,151 +33,84 @@
 import org.slf4j.LoggerFactory;
 
 
-/**
- * This class provides the implementation for scheduling multistage queries on 
a single node based
- * on the {@link OpChainScheduler} logic that is passed in. Multistage queries 
support partial execution
- * and will return a NOOP metadata block as a "yield" signal, indicating that 
the next operator
- * chain ({@link OpChainScheduler#next} will be requested.
- */
-@SuppressWarnings("UnstableApiUsage")
-public class OpChainSchedulerService extends AbstractExecutionThreadService {
+public class OpChainSchedulerService implements SchedulerService {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpChainSchedulerService.class);
-  /**
-   * Default time scheduler is allowed to wait for a runnable OpChain to be 
available.
-   */
-  private static final long DEFAULT_SCHEDULER_NEXT_WAIT_MS = 100;
-  /**
-   * Default cancel signal retention, this should be set to several times 
larger than
-   * {@link 
org.apache.pinot.query.service.QueryConfig#DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS}.
-   */
-  private static final long SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS = 
60_000L;
-
-  private final OpChainScheduler _scheduler;
-  private final ExecutorService _workerPool;
-  private final Cache<Long, Long> _cancelledRequests = 
CacheBuilder.newBuilder()
-      .expireAfterWrite(SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS, 
TimeUnit.MILLISECONDS).build();
 
-  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService 
workerPool) {
-    _scheduler = scheduler;
-    _workerPool = workerPool;
-  }
+  private final ExecutorService _executorService;
+  private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap;
 
-  @Override
-  protected void triggerShutdown() {
-    // TODO: Figure out shutdown lifecycle with graceful shutdown in mind.
-    LOGGER.info("Triggered shutdown on OpChainScheduler...");
+  public OpChainSchedulerService(ExecutorService executorService) {
+    _executorService = executorService;
+    _submittedOpChainMap = new ConcurrentHashMap<>();
   }
 
   @Override
-  protected void run()
-      throws Exception {
-    while (isRunning()) {
-      OpChain operatorChain = _scheduler.next(DEFAULT_SCHEDULER_NEXT_WAIT_MS, 
TimeUnit.MILLISECONDS);
-      if (operatorChain == null) {
-        continue;
-      }
-      LOGGER.trace("({}): Scheduling", operatorChain);
-      _workerPool.submit(new TraceRunnable() {
+  public void register(OpChain operatorChain) {
+    Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {
         @Override
         public void runJob() {
           boolean isFinished = false;
-          boolean returnedErrorBlock = false;
+          TransferableBlock returnedErrorBlock = null;
           Throwable thrown = null;
           try {
             LOGGER.trace("({}): Executing", operatorChain);
-            // throw if the operatorChain is cancelled.
-            if 
(_cancelledRequests.asMap().containsKey(operatorChain.getId().getRequestId())) {
-              throw new InterruptedException("Query was cancelled!");
-            }
             operatorChain.getStats().executing();
-            // so long as there's work to be done, keep getting the next block
-            // when the operator chain returns a NOOP block, then yield the 
execution
-            // of this to another worker
             TransferableBlock result = operatorChain.getRoot().nextBlock();
-            while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+            while (!result.isEndOfStreamBlock()) {
               result = operatorChain.getRoot().nextBlock();
             }
-
-            if (result.isNoOpBlock()) {
-              // TODO: There should be a waiting-for-data state in 
OpChainStats.
-              operatorChain.getStats().queued();
-              _scheduler.yield(operatorChain);
+            isFinished = true;
+            if (result.isErrorBlock()) {
+              returnedErrorBlock = result;
+              LOGGER.error("({}): Completed erroneously {} {}", operatorChain, 
operatorChain.getStats(),
+                  result.getDataBlock().getExceptions());
             } else {
-              isFinished = true;
-              if (result.isErrorBlock()) {
-                returnedErrorBlock = true;
-                LOGGER.error("({}): Completed erroneously {} {}", 
operatorChain, operatorChain.getStats(),
-                    result.getDataBlock().getExceptions());
-              } else {
-                LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
-              }
+              LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
             }
           } catch (Exception e) {
             LOGGER.error("({}): Failed to execute operator chain! {}", 
operatorChain, operatorChain.getStats(), e);
             thrown = e;
           } finally {
-            if (returnedErrorBlock || thrown != null) {
+            if (returnedErrorBlock != null || thrown != null) {
+              if (thrown == null) {
+                String blockMsg;
+                if (returnedErrorBlock.getDataBlock() instanceof 
MetadataBlock) {
+                  MetadataBlock metadataBlock = (MetadataBlock) 
returnedErrorBlock.getDataBlock();
+                  blockMsg = String.join(", ", 
metadataBlock.getExceptions().values());
+                } else {
+                  blockMsg = "Unknown";
+                }
+                thrown = new RuntimeException("Error block " + blockMsg);
+              }
               cancelOpChain(operatorChain, thrown);
             } else if (isFinished) {
               closeOpChain(operatorChain);
             }
           }
         }
       });
-    }
+    _submittedOpChainMap.put(operatorChain.getId(), scheduledFuture);

Review Comment:
   Done in 2e4bde2031b0b7ccf694bb4d8927734b4579681e



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to