walterddr commented on code in PR #11205:
URL: https://github.com/apache/pinot/pull/11205#discussion_r1280085055


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java:
##########
@@ -62,10 +65,10 @@ public OpChainExecutionContext(MailboxService 
mailboxService, long requestId, in
     _traceEnabled = traceEnabled;
   }
 
-  public OpChainExecutionContext(PhysicalPlanContext physicalPlanContext) {
+  public OpChainExecutionContext(PhysicalPlanContext physicalPlanContext, 
Executor executor) {
     this(physicalPlanContext.getMailboxService(), 
physicalPlanContext.getRequestId(), physicalPlanContext.getStageId(),
         physicalPlanContext.getServer(), physicalPlanContext.getDeadlineMs(), 
physicalPlanContext.getStageMetadata(),
-        physicalPlanContext.getPipelineBreakerResult(), 
physicalPlanContext.isTraceEnabled());
+        physicalPlanContext.getPipelineBreakerResult(), 
physicalPlanContext.isTraceEnabled(), executor);

Review Comment:
   nit: it was wondering if we should separate the plan context from the 
machine context. 
   based on what i see: `mailboxService` / `executor` is machine context and 
the rest of physicalPlanContext should still remains. 
   
   This should be a good start but we can do better in next iteration



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java:
##########
@@ -61,13 +63,18 @@ public BaseMailboxReceiveOperator(OpChainExecutionContext 
context, RelDistributi
         
context.getStageMetadata().getWorkerMetadataList().get(workerId).getMailBoxInfosMap().get(senderStageId);
     if (senderMailBoxMetadatas != null && 
!senderMailBoxMetadatas.getMailBoxIdList().isEmpty()) {
       _mailboxIds = MailboxIdUtils.toMailboxIds(requestId, 
senderMailBoxMetadatas);
-      _mailboxes = _mailboxIds.stream()
-          .map(mailboxId -> _mailboxService.getReceivingMailbox(mailboxId))
-          .collect(Collectors.toCollection(ArrayDeque::new));
     } else {
       _mailboxIds = Collections.emptyList();
-      _mailboxes = new ArrayDeque<>();
     }
+    List<ReadMailboxAsyncStream> asyncStreams = _mailboxIds.stream()

Review Comment:
   note to self. skip reviewing async stream for now



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java:
##########
@@ -112,15 +112,14 @@ protected void constructRightBlockSet() {
 
   protected TransferableBlock constructResultBlockSet(TransferableBlock 
leftBlock) {
     List<Object[]> rows = new ArrayList<>();
+    // TODO: Other operators keep the first erroneous block, while this keep 
the last.
+    //  We should decide what is what we want to do and be consistent with 
that.

Review Comment:
   CC @xiangfu0 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java:
##########
@@ -167,8 +167,13 @@ public boolean isSuccessfulEndOfStreamBlock() {
   /**
    * @return whether this block represents a NOOP block
    */
+  @Deprecated(forRemoval = true)
   public boolean isNoOpBlock() {
-    return isType(MetadataBlock.MetadataBlockType.NOOP);
+    return false;

Review Comment:
   yes let's clean up the tests so we don't need to keep this 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java:
##########
@@ -188,4 +193,10 @@ private boolean isType(MetadataBlock.MetadataBlockType 
type) {
     MetadataBlock metadata = (MetadataBlock) _dataBlock;
     return metadata.getType() == type;
   }
+
+  @Override
+  public String toString() {
+    String blockType = isErrorBlock() ? "error" : 
isSuccessfulEndOfStreamBlock() ? "eos" : "data";
+    return "TransferableBlock{blockType=" + blockType + ", _numRows=" + 
_numRows + ", _container=" + _container + '}';

Review Comment:
   :+1:



##########
pinot-core/src/main/java/org/apache/pinot/core/common/Operator.java:
##########
@@ -39,6 +39,7 @@ public interface Operator<T extends Block> {
    * @throws EarlyTerminationException if the operator is early-terminated 
(interrupted) before processing the next
    *         block of data. Operator can early terminated when the query times 
out, or is already satisfied.
    */
+  @Nullable

Review Comment:
   what's this annotation about? if this only affects v2 we should annotate on 
the multistage base operator?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -101,9 +99,8 @@ public void init(PinotConfiguration config, 
InstanceDataManager instanceDataMana
           new NamedThreadFactory("query_intermediate_worker_on_" + _port + 
"_port"));
       _queryWorkerLeafExecutorService = 
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
           new NamedThreadFactory("query_leaf_worker_on_" + _port + "_port"));
-      _scheduler = new OpChainSchedulerService(new 
RoundRobinScheduler(releaseMs),
-          getQueryWorkerIntermExecutorService());
-      _mailboxService = new MailboxService(_hostname, _port, config, 
_scheduler::onDataAvailable);
+      _scheduler = new 
OpChainSchedulerService(getQueryWorkerIntermExecutorService());

Review Comment:
   1 is enough for now. later we want to deal with mixed v1/v2 workload we 
might want to reuse the same executor as the v1 leaf so that it is fair. but 
for now this is good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to