Jackie-Jiang commented on code in PR #11205:
URL: https://github.com/apache/pinot/pull/11205#discussion_r1286653961


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -81,9 +84,10 @@ public void cancel(Throwable t) {
         _contentObserver = getContentObserver();
       }
       try {
+        String msg = t != null ? t.getMessage() : "Unknown";

Review Comment:
   (minor) Is this needed?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/ExecutorServiceUtils.java:
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.NamedThreadFactory;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExecutorServiceUtils {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ExecutorServiceUtils.class);
+
+  private ExecutorServiceUtils() {
+  }
+
+  public static ExecutorService createDefault(String baseName) {
+    return Executors.newCachedThreadPool(new NamedThreadFactory(baseName));
+  }
+
+  public static ExecutorService create(PinotConfiguration conf, String 
confPrefix, String baseName) {
+    //TODO: make this configurable
+    return Executors.newCachedThreadPool(new NamedThreadFactory(baseName));
+  }
+
+  public static void close(ExecutorService executorService) {

Review Comment:
   (minor) Let's pass in the timeout



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -173,30 +165,32 @@ public void cancel(long requestId) {
 
   @VisibleForTesting
   public ExecutorService getQueryWorkerLeafExecutorService() {
-    return _queryWorkerLeafExecutorService;
+    return _opChainExecutor;
   }
 
   @VisibleForTesting
   public ExecutorService getQueryWorkerIntermExecutorService() {
-    return _queryWorkerIntermExecutorService;
+    return _opChainExecutor;
   }

Review Comment:
   (minor) Can we clean this up since there is only one executor



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -118,46 +118,52 @@ public String toExplainString() {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    boolean canContinue = true;
     TransferableBlock transferableBlock;
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("==[SEND]== Enter getNextBlock from: " + _context.getId());
+    }
     try {
       transferableBlock = _sourceOperator.nextBlock();
-      if (transferableBlock.isNoOpBlock()) {
-        return transferableBlock;
-      } else if (transferableBlock.isEndOfStreamBlock()) {
-        if (transferableBlock.isSuccessfulEndOfStreamBlock()) {
-          // Stats need to be populated here because the block is being sent 
to the mailbox
-          // and the receiving opChain will not be able to access the stats 
from the previous opChain
-          TransferableBlock eosBlockWithStats = 
TransferableBlockUtils.getEndOfStreamTransferableBlock(
-              
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
-          sendTransferableBlock(eosBlockWithStats);
-        } else {
-          sendTransferableBlock(transferableBlock);
-        }
-      } else { // normal blocks
-        // check whether we should continue depending on exchange queue 
condition.
-        canContinue = sendTransferableBlock(transferableBlock);
+      if (transferableBlock.isSuccessfulEndOfStreamBlock()) {
+        // Stats need to be populated here because the block is being sent to 
the mailbox
+        // and the receiving opChain will not be able to access the stats from 
the previous opChain
+        TransferableBlock eosBlockWithStats = 
TransferableBlockUtils.getEndOfStreamTransferableBlock(
+            
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
+        sendTransferableBlock(eosBlockWithStats, false);
+      } else {
+        sendTransferableBlock(transferableBlock, true);
       }
     } catch (Exception e) {
       transferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
       try {
         LOGGER.error("Exception while transferring data on opChain: " + 
_context.getId(), e);
-        sendTransferableBlock(transferableBlock);
+        sendTransferableBlock(transferableBlock, false);
       } catch (Exception e2) {
         LOGGER.error("Exception while sending error block.", e2);
       }
     }
-    // yield if we cannot continue to put transferable block into the sending 
queue
-    return canContinue ? transferableBlock : 
TransferableBlockUtils.getNoOpTransferableBlock();
+    return transferableBlock;
   }
 
-  private boolean sendTransferableBlock(TransferableBlock block)
+  private void sendTransferableBlock(TransferableBlock block, boolean 
throwIfTimeout)

Review Comment:
   Should we always throw on timeout?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java:
##########
@@ -44,6 +45,7 @@ public MultiStageOperator(OpChainExecutionContext context) {
     _opChainStats = _context.getStats();
   }
 
+  @Nonnull

Review Comment:
   (minor) Since multi-stage operator never returns `null` block (only very few 
single-stage operator can return `null`), suggest not changing the `Operator` 
interface right now because that will introduce quite some warnings in 
single-stage engine



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java:
##########
@@ -86,13 +86,14 @@ protected TransferableBlock getNextBlock() {
     try {
       TransferableBlock block = _upstreamOperator.nextBlock();
       return transform(block);
-    } catch (Exception e) {
+    } catch (RuntimeException e) {
       return TransferableBlockUtils.getErrorTransferableBlock(e);
     }
   }
 
-  private TransferableBlock transform(TransferableBlock block)
-      throws Exception {
+  private TransferableBlock transform(TransferableBlock block) {
+    // TODO: Other operators keep the first erroneous block, while this keep 
the last.
+    //  We should decide what is what we want to do and be consistent with 
that.

Review Comment:
   I guess it doesn't matter because the execution will terminate after the 
first error block is returned?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java:
##########
@@ -113,14 +113,11 @@ public MultiStageBrokerRequestHandler(PinotConfiguration 
config, String brokerId
         new WorkerManager(_reducerHostname, _reducerPort, routingManager), 
_tableCache);
     _queryDispatcher = new QueryDispatcher();
 
-    long releaseMs = 
config.getProperty(QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS,
-        QueryConfig.DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS);
-    _reducerScheduler = new OpChainSchedulerService(new 
RoundRobinScheduler(releaseMs),
-        Executors.newCachedThreadPool(new 
NamedThreadFactory("query_broker_reducer_" + _reducerPort + "_port")));
-    _mailboxService = new MailboxService(_reducerHostname, _reducerPort, 
config, _reducerScheduler::onDataAvailable);
+    _opChainExecutor = ExecutorServiceUtils.create(config, "multiStage", 
"multiStage_on_" + _reducerPort + "port");

Review Comment:
   (minor) Shall we keep the same thread name, or is it intentional to change 
it?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java:
##########
@@ -188,4 +193,10 @@ private boolean isType(MetadataBlock.MetadataBlockType 
type) {
     MetadataBlock metadata = (MetadataBlock) _dataBlock;
     return metadata.getType() == type;
   }
+
+  @Override
+  public String toString() {
+    String blockType = isErrorBlock() ? "error" : 
isSuccessfulEndOfStreamBlock() ? "eos" : "data";
+    return "TransferableBlock{blockType=" + blockType + ", _numRows=" + 
_numRows + ", _container=" + _container + '}';

Review Comment:
   We should not include `_container` here. It could be huge



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java:
##########
@@ -86,13 +86,14 @@ protected TransferableBlock getNextBlock() {
     try {
       TransferableBlock block = _upstreamOperator.nextBlock();
       return transform(block);
-    } catch (Exception e) {
+    } catch (RuntimeException e) {

Review Comment:
   Should we catch it or just throw it out?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/ExecutorServiceUtils.java:
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.NamedThreadFactory;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExecutorServiceUtils {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ExecutorServiceUtils.class);
+
+  private ExecutorServiceUtils() {
+  }
+
+  public static ExecutorService createDefault(String baseName) {
+    return Executors.newCachedThreadPool(new NamedThreadFactory(baseName));
+  }
+
+  public static ExecutorService create(PinotConfiguration conf, String 
confPrefix, String baseName) {
+    //TODO: make this configurable
+    return Executors.newCachedThreadPool(new NamedThreadFactory(baseName));
+  }

Review Comment:
   (minor) I feel this is not general enough to be an util class for 2 reasons:
   - It is always creating a cached thread pool
   - We are not using any config
   
   Suggest directly creating the executor instead of using this method



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java:
##########
@@ -167,8 +167,13 @@ public boolean isSuccessfulEndOfStreamBlock() {
   /**
    * @return whether this block represents a NOOP block
    */
+  @Deprecated(forRemoval = true)
   public boolean isNoOpBlock() {
-    return isType(MetadataBlock.MetadataBlockType.NOOP);
+    return false;

Review Comment:
   (minor) Remember to clean this up



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -31,151 +32,73 @@
 import org.slf4j.LoggerFactory;
 
 
-/**
- * This class provides the implementation for scheduling multistage queries on 
a single node based
- * on the {@link OpChainScheduler} logic that is passed in. Multistage queries 
support partial execution
- * and will return a NOOP metadata block as a "yield" signal, indicating that 
the next operator
- * chain ({@link OpChainScheduler#next} will be requested.
- */
-@SuppressWarnings("UnstableApiUsage")
-public class OpChainSchedulerService extends AbstractExecutionThreadService {
+public class OpChainSchedulerService implements SchedulerService {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpChainSchedulerService.class);
-  /**
-   * Default time scheduler is allowed to wait for a runnable OpChain to be 
available.
-   */
-  private static final long DEFAULT_SCHEDULER_NEXT_WAIT_MS = 100;
-  /**
-   * Default cancel signal retention, this should be set to several times 
larger than
-   * {@link 
org.apache.pinot.query.service.QueryConfig#DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS}.
-   */
-  private static final long SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS = 
60_000L;
 
-  private final OpChainScheduler _scheduler;
-  private final ExecutorService _workerPool;
-  private final Cache<Long, Long> _cancelledRequests = 
CacheBuilder.newBuilder()
-      .expireAfterWrite(SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS, 
TimeUnit.MILLISECONDS).build();
+  private final ExecutorService _executorService;
+  private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap;
 
-  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService 
workerPool) {
-    _scheduler = scheduler;
-    _workerPool = workerPool;
+  public OpChainSchedulerService(ExecutorService executorService) {
+    _executorService = executorService;
+    _submittedOpChainMap = new ConcurrentHashMap<>();
   }
 
   @Override
-  protected void triggerShutdown() {
-    // TODO: Figure out shutdown lifecycle with graceful shutdown in mind.
-    LOGGER.info("Triggered shutdown on OpChainScheduler...");
-  }
-
-  @Override
-  protected void run()
-      throws Exception {
-    while (isRunning()) {
-      OpChain operatorChain = _scheduler.next(DEFAULT_SCHEDULER_NEXT_WAIT_MS, 
TimeUnit.MILLISECONDS);
-      if (operatorChain == null) {
-        continue;
-      }
-      LOGGER.trace("({}): Scheduling", operatorChain);
-      _workerPool.submit(new TraceRunnable() {
-        @Override
-        public void runJob() {
-          boolean isFinished = false;
-          boolean returnedErrorBlock = false;
-          Throwable thrown = null;
-          try {
-            LOGGER.trace("({}): Executing", operatorChain);
-            // throw if the operatorChain is cancelled.
-            if 
(_cancelledRequests.asMap().containsKey(operatorChain.getId().getRequestId())) {
-              throw new InterruptedException("Query was cancelled!");
-            }
-            operatorChain.getStats().executing();
-            // so long as there's work to be done, keep getting the next block
-            // when the operator chain returns a NOOP block, then yield the 
execution
-            // of this to another worker
-            TransferableBlock result = operatorChain.getRoot().nextBlock();
-            while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
-              result = operatorChain.getRoot().nextBlock();
-            }
-
-            if (result.isNoOpBlock()) {
-              // TODO: There should be a waiting-for-data state in 
OpChainStats.
-              operatorChain.getStats().queued();
-              _scheduler.yield(operatorChain);
-            } else {
-              isFinished = true;
-              if (result.isErrorBlock()) {
-                returnedErrorBlock = true;
-                LOGGER.error("({}): Completed erroneously {} {}", 
operatorChain, operatorChain.getStats(),
-                    result.getDataBlock().getExceptions());
-              } else {
-                LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
-              }
-            }
-          } catch (Exception e) {
-            LOGGER.error("({}): Failed to execute operator chain! {}", 
operatorChain, operatorChain.getStats(), e);
-            thrown = e;
-          } finally {
-            if (returnedErrorBlock || thrown != null) {
-              cancelOpChain(operatorChain, thrown);
-            } else if (isFinished) {
-              closeOpChain(operatorChain);
+  public void register(OpChain operatorChain) {
+    Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {
+      @Override
+      public void runJob() {
+        boolean isFinished = false;
+        TransferableBlock returnedErrorBlock = null;
+        Throwable thrown = null;
+        try {
+          LOGGER.trace("({}): Executing", operatorChain);
+          operatorChain.getStats().executing();
+          TransferableBlock result = operatorChain.getRoot().nextBlock();
+          while (!result.isEndOfStreamBlock()) {
+            result = operatorChain.getRoot().nextBlock();
+          }
+          isFinished = true;
+          if (result.isErrorBlock()) {
+            returnedErrorBlock = result;
+            LOGGER.error("({}): Completed erroneously {} {}", operatorChain, 
operatorChain.getStats(),
+                result.getDataBlock().getExceptions());
+          } else {
+            LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
+          }
+        } catch (Exception e) {
+          LOGGER.error("({}): Failed to execute operator chain! {}", 
operatorChain, operatorChain.getStats(), e);
+          thrown = e;
+        } finally {
+          _submittedOpChainMap.remove(operatorChain.getId());
+          if (returnedErrorBlock != null || thrown != null) {
+            if (thrown == null) {
+              thrown = new RuntimeException("Error block " + 
returnedErrorBlock.getDataBlock().getExceptions());
             }
+            operatorChain.cancel(thrown);
+          } else if (isFinished) {
+            operatorChain.close();
           }
         }
-      });
-    }
-  }
-
-  /**
-   * Register a new operator chain with the scheduler.
-   *
-   * @param operatorChain the chain to register
-   */
-  public final void register(OpChain operatorChain) {
-    operatorChain.getStats().queued();
-    _scheduler.register(operatorChain);
-    LOGGER.debug("({}): Scheduler is now handling operator chain listening to 
mailboxes {}. "
-            + "There are a total of {} chains awaiting execution.", 
operatorChain,
-        operatorChain.getReceivingMailboxIds(),
-        _scheduler.size());
-  }
-
-  /**
-   * Async cancel a request. Request will not be fully cancelled until the 
next time opChain is being polled.
-   *
-   * @param requestId requestId to be cancelled.
-   */
-  public final void cancel(long requestId) {
-    _cancelledRequests.put(requestId, requestId);
-  }
-
-  /**
-   * This method should be called whenever data is available for an {@link 
OpChain} to consume.
-   * Implementations of this method should be idempotent, it may be called in 
the scenario that no data is available.
-   *
-   * @param opChainId the identifier of the operator chain
-   */
-  public final void onDataAvailable(OpChainId opChainId) {
-    _scheduler.onDataAvailable(opChainId);
-  }
-
-  // TODO: remove this method after we pipe down the proper executor pool to 
the v1 engine
-  public ExecutorService getWorkerPool() {
-    return _workerPool;
-  }
-
-  private void closeOpChain(OpChain opChain) {
-    try {
-      opChain.close();
-    } finally {
-      _scheduler.deregister(opChain);
-    }
+      }
+    });
+    _submittedOpChainMap.put(operatorChain.getId(), scheduledFuture);
   }
 
-  private void cancelOpChain(OpChain opChain, Throwable e) {
-    try {
-      opChain.cancel(e);
-    } finally {
-      _scheduler.deregister(opChain);
+  @Override
+  public void cancel(long requestId) {
+    // simple cancellation. for leaf stage this cannot be a dangling opchain 
b/c they will eventually be cleared up
+    // via query timeout.
+    List<Map.Entry<OpChainId, Future<?>>> entries =
+        _submittedOpChainMap.entrySet().stream().filter(entry -> 
entry.getKey().getRequestId() == requestId)
+            .collect(Collectors.toList());
+    for (Map.Entry<OpChainId, Future<?>> entry : entries) {
+      OpChainId key = entry.getKey();
+      Future<?> future = entry.getValue();
+      if (future != null) {
+        future.cancel(true);
+      }
+      _submittedOpChainMap.remove(key);

Review Comment:
   (minor) Can reduce several map access
   ```suggestion
       Iterator<Map.Entry<OpChainId, Future<?>>> iterator = 
_submittedOpChainMap.entrySet().iterator();
       while (iterator.hasNext()) {
         Map.Entry<OpChainId, Future<?>> entry = iterator.next();
         if (entry.getKey().getRequestId() == requestId) {
           entry.getValue().cancel(true);
           iterator.remove();
         }
       }
   ```
   
   Not introduced in this PR, but can there be race condition of a query being 
cancelled first, then one OpChain is registered?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java:
##########
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.utils;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class BlockingMultiStreamConsumer<E> implements AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BlockingMultiStreamConsumer.class);
+  private final Object _id;
+  protected final List<? extends AsyncStream<E>> _mailboxes;
+  protected final ArrayBlockingQueue<Boolean> _newDataReady = new 
ArrayBlockingQueue<>(1);
+  private final long _deadlineMs;
+  /**
+   * An index that used to calculate where do we are going to start reading.
+   * The invariant is that we are always going to start reading from {@code 
_lastRead + 1}.
+   * Therefore {@link #_lastRead} must be in the range {@code [-1, 
mailbox.size() - 1]}
+   */
+  protected int _lastRead;
+  private E _errorBlock = null;
+
+  public BlockingMultiStreamConsumer(Object id, long deadlineMs, List<? 
extends AsyncStream<E>> asyncProducers) {
+    _id = id;
+    _deadlineMs = deadlineMs;
+    AsyncStream.OnNewData onNewData = this::onData;
+    _mailboxes = asyncProducers;
+    _mailboxes.forEach(blockProducer -> 
blockProducer.addOnNewDataListener(onNewData));
+    _lastRead = _mailboxes.size() - 1;
+  }
+
+  protected abstract boolean isError(E element);
+
+  protected abstract boolean isEos(E element);
+
+  protected abstract E onTimeout();
+
+  protected abstract E onException(Exception e);
+
+  protected abstract E onEos();
+
+  @Override
+  public void close() {
+    cancelRemainingMailboxes();
+  }
+
+  public void cancel(Throwable t) {
+    cancelRemainingMailboxes();
+  }
+
+  protected void cancelRemainingMailboxes() {
+    for (AsyncStream<E> mailbox : _mailboxes) {
+      mailbox.cancel();
+    }
+  }
+
+  public void onData() {
+    if (_newDataReady.offer(Boolean.TRUE)) {
+      if (LOGGER.isTraceEnabled()) {
+        LOGGER.trace("New data notification delivered on " + _id + ". " + 
System.identityHashCode(_newDataReady));
+      }
+    } else if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("New data notification ignored on " + _id + ". " + 
System.identityHashCode(_newDataReady));
+    }
+  }
+
+  /**
+   * Reads the next block for any ready mailbox or blocks until some of them 
is ready.
+   *
+   * The method implements a sequential read semantic. Meaning that:
+   * <ol>
+   *   <li>EOS is only returned when all mailboxes already emitted EOS or 
there are no mailboxes</li>
+   *   <li>If an error is read from a mailbox, the error is returned</li>
+   *   <li>If data is read from a mailbox, that data block is returned</li>
+   *   <li>If no mailbox is ready, the calling thread is blocked</li>
+   * </ol>
+   *
+   * Right now the implementation tries to be fair. If one call returned the 
block from mailbox {@code i}, then next
+   * call will look for mailbox {@code i+1}, {@code i+2}... in a circular 
manner.
+   *
+   * In order to unblock a thread blocked here, {@link #onData()} should be 
called.   *
+   */
+  public E readBlockBlocking() {
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("==[RECEIVE]== Enter getNextBlock from: " + _id + " 
mailboxSize: " + _mailboxes.size());
+    }
+    // Standard optimistic execution. First we try to read without acquiring 
the lock.
+    E block = readDroppingSuccessEos();
+    long timeoutMs = _deadlineMs - System.currentTimeMillis();
+    boolean timeout = false;
+    try {
+      while (block == null) { // we didn't find a mailbox ready to read, so we 
need to be pessimistic
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("==[RECEIVE]== Blocked on : " + _id + ". " + 
System.identityHashCode(_newDataReady));
+        }
+        timeout = _newDataReady.poll(timeoutMs, TimeUnit.MILLISECONDS) == null;
+        LOGGER.debug("==[RECEIVE]== More data available. Trying to read 
again");
+        block = readDroppingSuccessEos();
+      }
+
+      if (timeout) {
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.warn("==[RECEIVE]== Timeout on: " + _id);
+        }
+        _errorBlock = onTimeout();
+        return _errorBlock;
+      } else if (LOGGER.isTraceEnabled()) {
+        LOGGER.trace("==[RECEIVE]== Ready to emit on: " + _id);
+      }
+    } catch (InterruptedException ex) {
+      return onException(ex);
+    }
+    return block;
+  }
+
+  /**
+   * This is a utility method that reads tries to read from the different 
mailboxes in a circular manner.
+   *
+   * The method is a bit more complex than expected because ir order to 
simplify {@link #readBlockBlocking} we added
+   * some extra logic here. For example, this method checks for timeouts, add 
some logs, releases mailboxes that emitted
+   * EOS and in case an error block is found, stores it.
+   *
+   * @return the new block to consume or null if none is found. EOS is only 
emitted when all mailboxes already emitted
+   * EOS.
+   */
+  @Nullable
+  private E readDroppingSuccessEos() {
+    if (System.currentTimeMillis() > _deadlineMs) {
+      _errorBlock = onTimeout();
+      return _errorBlock;
+    }
+
+    E block = readBlockOrNull();
+    while (block != null && isEos(block) && !_mailboxes.isEmpty()) {

Review Comment:
   (minor) Can `_mailboxes` be empty if `block != null`?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -61,6 +61,9 @@ public GrpcSendingMailbox(String id, ChannelManager 
channelManager, String hostn
   @Override
   public void send(TransferableBlock block)
       throws IOException {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("==[GRPC SEND]== sending data to: " + _id);

Review Comment:
   (minor) Same for other logs
   ```suggestion
         LOGGER.debug("==[GRPC SEND]== sending data to: {}", _id);
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java:
##########
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.utils;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class BlockingMultiStreamConsumer<E> implements AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BlockingMultiStreamConsumer.class);
+  private final Object _id;
+  protected final List<? extends AsyncStream<E>> _mailboxes;
+  protected final ArrayBlockingQueue<Boolean> _newDataReady = new 
ArrayBlockingQueue<>(1);
+  private final long _deadlineMs;
+  /**
+   * An index that used to calculate where do we are going to start reading.
+   * The invariant is that we are always going to start reading from {@code 
_lastRead + 1}.
+   * Therefore {@link #_lastRead} must be in the range {@code [-1, 
mailbox.size() - 1]}
+   */
+  protected int _lastRead;
+  private E _errorBlock = null;
+
+  public BlockingMultiStreamConsumer(Object id, long deadlineMs, List<? 
extends AsyncStream<E>> asyncProducers) {
+    _id = id;
+    _deadlineMs = deadlineMs;
+    AsyncStream.OnNewData onNewData = this::onData;
+    _mailboxes = asyncProducers;
+    _mailboxes.forEach(blockProducer -> 
blockProducer.addOnNewDataListener(onNewData));
+    _lastRead = _mailboxes.size() - 1;
+  }
+
+  protected abstract boolean isError(E element);
+
+  protected abstract boolean isEos(E element);
+
+  protected abstract E onTimeout();
+
+  protected abstract E onException(Exception e);
+
+  protected abstract E onEos();
+
+  @Override
+  public void close() {
+    cancelRemainingMailboxes();
+  }
+
+  public void cancel(Throwable t) {
+    cancelRemainingMailboxes();
+  }
+
+  protected void cancelRemainingMailboxes() {
+    for (AsyncStream<E> mailbox : _mailboxes) {
+      mailbox.cancel();
+    }
+  }
+
+  public void onData() {
+    if (_newDataReady.offer(Boolean.TRUE)) {
+      if (LOGGER.isTraceEnabled()) {
+        LOGGER.trace("New data notification delivered on " + _id + ". " + 
System.identityHashCode(_newDataReady));
+      }
+    } else if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("New data notification ignored on " + _id + ". " + 
System.identityHashCode(_newDataReady));
+    }
+  }
+
+  /**
+   * Reads the next block for any ready mailbox or blocks until some of them 
is ready.
+   *
+   * The method implements a sequential read semantic. Meaning that:
+   * <ol>
+   *   <li>EOS is only returned when all mailboxes already emitted EOS or 
there are no mailboxes</li>
+   *   <li>If an error is read from a mailbox, the error is returned</li>
+   *   <li>If data is read from a mailbox, that data block is returned</li>
+   *   <li>If no mailbox is ready, the calling thread is blocked</li>
+   * </ol>
+   *
+   * Right now the implementation tries to be fair. If one call returned the 
block from mailbox {@code i}, then next
+   * call will look for mailbox {@code i+1}, {@code i+2}... in a circular 
manner.
+   *
+   * In order to unblock a thread blocked here, {@link #onData()} should be 
called.   *
+   */
+  public E readBlockBlocking() {
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("==[RECEIVE]== Enter getNextBlock from: " + _id + " 
mailboxSize: " + _mailboxes.size());
+    }
+    // Standard optimistic execution. First we try to read without acquiring 
the lock.
+    E block = readDroppingSuccessEos();
+    long timeoutMs = _deadlineMs - System.currentTimeMillis();
+    boolean timeout = false;
+    try {
+      while (block == null) { // we didn't find a mailbox ready to read, so we 
need to be pessimistic

Review Comment:
   I somehow see why you want to have this while check. The previous call of 
this might get a block from line 111 without consuming the `_newDataReady`, and 
next call might consume `_newDataReady` without having a new block. I feel it 
works if we break on `timeout`, but let's think more race conditions, and add a 
test for this



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java:
##########
@@ -112,15 +112,14 @@ protected void constructRightBlockSet() {
 
   protected TransferableBlock constructResultBlockSet(TransferableBlock 
leftBlock) {
     List<Object[]> rows = new ArrayList<>();
+    // TODO: Other operators keep the first erroneous block, while this keep 
the last.
+    //  We should decide what is what we want to do and be consistent with 
that.

Review Comment:
   I don't follow this `TODO`. Do you mean we should collect the first error 
block from right block set if exists?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java:
##########
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.utils;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class BlockingMultiStreamConsumer<E> implements AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BlockingMultiStreamConsumer.class);
+  private final Object _id;
+  protected final List<? extends AsyncStream<E>> _mailboxes;
+  protected final ArrayBlockingQueue<Boolean> _newDataReady = new 
ArrayBlockingQueue<>(1);
+  private final long _deadlineMs;
+  /**
+   * An index that used to calculate where do we are going to start reading.
+   * The invariant is that we are always going to start reading from {@code 
_lastRead + 1}.
+   * Therefore {@link #_lastRead} must be in the range {@code [-1, 
mailbox.size() - 1]}
+   */
+  protected int _lastRead;
+  private E _errorBlock = null;
+
+  public BlockingMultiStreamConsumer(Object id, long deadlineMs, List<? 
extends AsyncStream<E>> asyncProducers) {
+    _id = id;
+    _deadlineMs = deadlineMs;
+    AsyncStream.OnNewData onNewData = this::onData;
+    _mailboxes = asyncProducers;
+    _mailboxes.forEach(blockProducer -> 
blockProducer.addOnNewDataListener(onNewData));
+    _lastRead = _mailboxes.size() - 1;
+  }
+
+  protected abstract boolean isError(E element);
+
+  protected abstract boolean isEos(E element);
+
+  protected abstract E onTimeout();
+
+  protected abstract E onException(Exception e);
+
+  protected abstract E onEos();
+
+  @Override
+  public void close() {
+    cancelRemainingMailboxes();
+  }
+
+  public void cancel(Throwable t) {
+    cancelRemainingMailboxes();
+  }
+
+  protected void cancelRemainingMailboxes() {
+    for (AsyncStream<E> mailbox : _mailboxes) {
+      mailbox.cancel();
+    }
+  }
+
+  public void onData() {
+    if (_newDataReady.offer(Boolean.TRUE)) {
+      if (LOGGER.isTraceEnabled()) {
+        LOGGER.trace("New data notification delivered on " + _id + ". " + 
System.identityHashCode(_newDataReady));
+      }
+    } else if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("New data notification ignored on " + _id + ". " + 
System.identityHashCode(_newDataReady));
+    }
+  }
+
+  /**
+   * Reads the next block for any ready mailbox or blocks until some of them 
is ready.
+   *
+   * The method implements a sequential read semantic. Meaning that:
+   * <ol>
+   *   <li>EOS is only returned when all mailboxes already emitted EOS or 
there are no mailboxes</li>
+   *   <li>If an error is read from a mailbox, the error is returned</li>
+   *   <li>If data is read from a mailbox, that data block is returned</li>
+   *   <li>If no mailbox is ready, the calling thread is blocked</li>
+   * </ol>
+   *
+   * Right now the implementation tries to be fair. If one call returned the 
block from mailbox {@code i}, then next
+   * call will look for mailbox {@code i+1}, {@code i+2}... in a circular 
manner.
+   *
+   * In order to unblock a thread blocked here, {@link #onData()} should be 
called.   *
+   */
+  public E readBlockBlocking() {
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("==[RECEIVE]== Enter getNextBlock from: " + _id + " 
mailboxSize: " + _mailboxes.size());
+    }
+    // Standard optimistic execution. First we try to read without acquiring 
the lock.
+    E block = readDroppingSuccessEos();
+    long timeoutMs = _deadlineMs - System.currentTimeMillis();
+    boolean timeout = false;
+    try {
+      while (block == null) { // we didn't find a mailbox ready to read, so we 
need to be pessimistic

Review Comment:
   I don't follow this part. Once we reach the timeout we should immediately 
return timeout error. Also, why do we have a `while` loop here? We should wait 
on `_newDataReady` once?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to