agavra commented on code in PR #9836:
URL: https://github.com/apache/pinot/pull/9836#discussion_r1037342781


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java:
##########
@@ -63,4 +63,9 @@ public boolean isInitialized() {
   public boolean isClosed() {
     return _closed && _queue.size() == 0;
   }
+
+  @Override
+  public void close() {

Review Comment:
   should we clear the queue here?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java:
##########
@@ -130,7 +130,9 @@ public void onError(Throwable e) {
 
   @Override
   public void onCompleted() {
-    _isCompleted.set(true);
-    _responseObserver.onCompleted();
+    if(!_isCompleted.get()){
+      _isCompleted.set(true);

Review Comment:
   there's a race condition here, it should be `if 
(!isCompleted.compareAndSet(false, true))`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java:
##########
@@ -83,6 +87,7 @@ private void shutdown() {
 
   @Override
   public void onCompleted() {
+    finishLatch.countDown();

Review Comment:
   nit: maybe we move this and `_isCompleted.set(true)` to `shutdown` so both 
`onError` and `onCompleted` have the same behavior.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -97,10 +97,12 @@ public void runJob() {
                 // not complete, needs to re-register for scheduling
                 register(operatorChain);
               } else {
-                LOGGER.info("Execution time: " + timer.getThreadTimeNs());
+                operatorChain.getRoot().close();

Review Comment:
   nit: let's keep both log statements (though looks like I forgot to change 
this one to `debug`!)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java:
##########
@@ -18,36 +18,58 @@
  */
 package org.apache.pinot.query.runtime.plan;
 
+import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
 
 
 public class PlanRequestContext {
   protected final MailboxService<TransferableBlock> _mailboxService;
   protected final long _requestId;
-  protected final int _stageId;
   protected final String _hostName;
   protected final int _port;
   protected final Map<Integer, StageMetadata> _metadataMap;
+  // TODO: Add exchange map if multiple exchanges are needed.
+  BlockExchange _exchange;

Review Comment:
   I think it breaks some abstraction barriers to allow any piece of code that 
has access to the `PlanRequestContext` to exchange blocks via a 
`BlockExchange`. Only the `MailboxSendOperator` should be able to send blocks 
IMO - otherwise it can be difficult to debug the ordering of events that are 
sent.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -35,9 +36,12 @@ public class OpChain {
   // TODO: build timers that are partial-execution aware
   private final Supplier<ThreadResourceUsageProvider> _timer;
 
-  public OpChain(Operator<TransferableBlock> root) {
-    _root = root;
+  // TODO: refactor this into OpChainContext
+  public PlanRequestContext _context;

Review Comment:
   nit: `private final`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -97,10 +97,12 @@ public void runJob() {
                 // not complete, needs to re-register for scheduling
                 register(operatorChain);
               } else {
-                LOGGER.info("Execution time: " + timer.getThreadTimeNs());
+                operatorChain.getRoot().close();
               }
             } catch (Exception e) {
-              LOGGER.error("Failed to execute query!", e);
+              
operatorChain._context.getExchange().send(TransferableBlockUtils.getErrorTransferableBlock(e));

Review Comment:
   this breaks some abstraction boundaries - this scheduler service should know 
nothing about the exchange or sending blocks; instead we should consider adding 
this to `MailboxSendOperator` (which is always the root operator for these 
chains). FWIW, I think that's already the case.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -97,10 +97,12 @@ public void runJob() {
                 // not complete, needs to re-register for scheduling
                 register(operatorChain);
               } else {
-                LOGGER.info("Execution time: " + timer.getThreadTimeNs());
+                operatorChain.getRoot().close();
               }
             } catch (Exception e) {
-              LOGGER.error("Failed to execute query!", e);
+              
operatorChain._context.getExchange().send(TransferableBlockUtils.getErrorTransferableBlock(e));
+              // TODO: pass this error through context.
+              operatorChain.getRoot().close();

Review Comment:
   consider pushing this into an `OpChain#close` method, we can also call that 
method from the scheduler in situations where the scheduler leaks operator 
chains (should never happen if errors are propagated properly, but it's good to 
be defensive in this situation)
   
   even better would be to have a `scheduler.unregister(OpChain)` method that 
we call here, that way we can also use that to clean up things like tracked 
mailboxes (see my comment on #9887)



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java:
##########
@@ -33,204 +33,204 @@ public class QueryTestSet {
   public Object[][] provideTestSql() {
     return new Object[][]{
         // Order BY LIMIT
-        new Object[]{"SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 3"},
-        new Object[]{"SELECT * FROM a ORDER BY col1, ts LIMIT 10"},
-        new Object[]{"SELECT * FROM a ORDER BY col1 LIMIT 20"},
-        new Object[]{"SELECT * FROM a ORDER BY col1, ts LIMIT 1, 2"},
-        new Object[]{"SELECT * FROM a ORDER BY col1, ts LIMIT 2 OFFSET 1"},
-
-        // No match filter
-        new Object[]{"SELECT * FROM b WHERE col3 < 0.5"},
-
-        // Hybrid table
+//        new Object[]{"SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 3"},

Review Comment:
   (reminder) I know this is a draft, but let's make sure these pass and 
uncomment them (or delete them if we don't want them anymore)



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java:
##########
@@ -63,6 +63,12 @@ protected BaseResultsBlock getNextBlock() {
     return _childOperator.nextBlock();
   }
 
+  @Override
+  public void close()

Review Comment:
   (suggestion) maybe this is the default implementation in `BaseOperator`? 
(will make the review a bit easier)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -76,14 +77,13 @@ protected void run()
         if (!isRunning()) {
           return;
         }
-
         OpChain operatorChain = _scheduler.next();
         _workerPool.submit(new TraceRunnable() {
           @Override
-          public void runJob() {
+          public void runJob()
+              throws InterruptedException {

Review Comment:
   note: this should never throw as the worker pool threads will just die and 
we'll be left with a dangling worker pool (these issues are really tough to 
debug). Instead let's catch any exceptions and handle them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to