walterddr commented on code in PR #11205: URL: https://github.com/apache/pinot/pull/11205#discussion_r1280085055
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java: ########## @@ -62,10 +65,10 @@ public OpChainExecutionContext(MailboxService mailboxService, long requestId, in _traceEnabled = traceEnabled; } - public OpChainExecutionContext(PhysicalPlanContext physicalPlanContext) { + public OpChainExecutionContext(PhysicalPlanContext physicalPlanContext, Executor executor) { this(physicalPlanContext.getMailboxService(), physicalPlanContext.getRequestId(), physicalPlanContext.getStageId(), physicalPlanContext.getServer(), physicalPlanContext.getDeadlineMs(), physicalPlanContext.getStageMetadata(), - physicalPlanContext.getPipelineBreakerResult(), physicalPlanContext.isTraceEnabled()); + physicalPlanContext.getPipelineBreakerResult(), physicalPlanContext.isTraceEnabled(), executor); Review Comment: nit: it was wondering if we should separate the plan context from the machine context. based on what i see: `mailboxService` / `executor` is machine context and the rest of physicalPlanContext should still remains. This should be a good start but we can do better in next iteration ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java: ########## @@ -61,13 +63,18 @@ public BaseMailboxReceiveOperator(OpChainExecutionContext context, RelDistributi context.getStageMetadata().getWorkerMetadataList().get(workerId).getMailBoxInfosMap().get(senderStageId); if (senderMailBoxMetadatas != null && !senderMailBoxMetadatas.getMailBoxIdList().isEmpty()) { _mailboxIds = MailboxIdUtils.toMailboxIds(requestId, senderMailBoxMetadatas); - _mailboxes = _mailboxIds.stream() - .map(mailboxId -> _mailboxService.getReceivingMailbox(mailboxId)) - .collect(Collectors.toCollection(ArrayDeque::new)); } else { _mailboxIds = Collections.emptyList(); - _mailboxes = new ArrayDeque<>(); } + List<ReadMailboxAsyncStream> asyncStreams = _mailboxIds.stream() Review Comment: note to self. skip reviewing async stream for now ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java: ########## @@ -112,15 +112,14 @@ protected void constructRightBlockSet() { protected TransferableBlock constructResultBlockSet(TransferableBlock leftBlock) { List<Object[]> rows = new ArrayList<>(); + // TODO: Other operators keep the first erroneous block, while this keep the last. + // We should decide what is what we want to do and be consistent with that. Review Comment: CC @xiangfu0 ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java: ########## @@ -167,8 +167,13 @@ public boolean isSuccessfulEndOfStreamBlock() { /** * @return whether this block represents a NOOP block */ + @Deprecated(forRemoval = true) public boolean isNoOpBlock() { - return isType(MetadataBlock.MetadataBlockType.NOOP); + return false; Review Comment: yes let's clean up the tests so we don't need to keep this ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java: ########## @@ -188,4 +193,10 @@ private boolean isType(MetadataBlock.MetadataBlockType type) { MetadataBlock metadata = (MetadataBlock) _dataBlock; return metadata.getType() == type; } + + @Override + public String toString() { + String blockType = isErrorBlock() ? "error" : isSuccessfulEndOfStreamBlock() ? "eos" : "data"; + return "TransferableBlock{blockType=" + blockType + ", _numRows=" + _numRows + ", _container=" + _container + '}'; Review Comment: :+1: ########## pinot-core/src/main/java/org/apache/pinot/core/common/Operator.java: ########## @@ -39,6 +39,7 @@ public interface Operator<T extends Block> { * @throws EarlyTerminationException if the operator is early-terminated (interrupted) before processing the next * block of data. Operator can early terminated when the query times out, or is already satisfied. */ + @Nullable Review Comment: what's this annotation about? if this only affects v2 we should annotate on the multistage base operator? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java: ########## @@ -101,9 +99,8 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana new NamedThreadFactory("query_intermediate_worker_on_" + _port + "_port")); _queryWorkerLeafExecutorService = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new NamedThreadFactory("query_leaf_worker_on_" + _port + "_port")); - _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), - getQueryWorkerIntermExecutorService()); - _mailboxService = new MailboxService(_hostname, _port, config, _scheduler::onDataAvailable); + _scheduler = new OpChainSchedulerService(getQueryWorkerIntermExecutorService()); Review Comment: 1 is enough for now. later we want to deal with mixed v1/v2 workload we might want to reuse the same executor as the v1 leaf so that it is fair. but for now this is good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org