walterddr commented on code in PR #11205:
URL: https://github.com/apache/pinot/pull/11205#discussion_r1280085055
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java:
##########
@@ -62,10 +65,10 @@ public OpChainExecutionContext(MailboxService
mailboxService, long requestId, in
_traceEnabled = traceEnabled;
}
- public OpChainExecutionContext(PhysicalPlanContext physicalPlanContext) {
+ public OpChainExecutionContext(PhysicalPlanContext physicalPlanContext,
Executor executor) {
this(physicalPlanContext.getMailboxService(),
physicalPlanContext.getRequestId(), physicalPlanContext.getStageId(),
physicalPlanContext.getServer(), physicalPlanContext.getDeadlineMs(),
physicalPlanContext.getStageMetadata(),
- physicalPlanContext.getPipelineBreakerResult(),
physicalPlanContext.isTraceEnabled());
+ physicalPlanContext.getPipelineBreakerResult(),
physicalPlanContext.isTraceEnabled(), executor);
Review Comment:
nit: it was wondering if we should separate the plan context from the
machine context.
based on what i see: `mailboxService` / `executor` is machine context and
the rest of physicalPlanContext should still remains.
This should be a good start but we can do better in next iteration
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java:
##########
@@ -61,13 +63,18 @@ public BaseMailboxReceiveOperator(OpChainExecutionContext
context, RelDistributi
context.getStageMetadata().getWorkerMetadataList().get(workerId).getMailBoxInfosMap().get(senderStageId);
if (senderMailBoxMetadatas != null &&
!senderMailBoxMetadatas.getMailBoxIdList().isEmpty()) {
_mailboxIds = MailboxIdUtils.toMailboxIds(requestId,
senderMailBoxMetadatas);
- _mailboxes = _mailboxIds.stream()
- .map(mailboxId -> _mailboxService.getReceivingMailbox(mailboxId))
- .collect(Collectors.toCollection(ArrayDeque::new));
} else {
_mailboxIds = Collections.emptyList();
- _mailboxes = new ArrayDeque<>();
}
+ List<ReadMailboxAsyncStream> asyncStreams = _mailboxIds.stream()
Review Comment:
note to self. skip reviewing async stream for now
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java:
##########
@@ -112,15 +112,14 @@ protected void constructRightBlockSet() {
protected TransferableBlock constructResultBlockSet(TransferableBlock
leftBlock) {
List<Object[]> rows = new ArrayList<>();
+ // TODO: Other operators keep the first erroneous block, while this keep
the last.
+ // We should decide what is what we want to do and be consistent with
that.
Review Comment:
CC @xiangfu0
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java:
##########
@@ -167,8 +167,13 @@ public boolean isSuccessfulEndOfStreamBlock() {
/**
* @return whether this block represents a NOOP block
*/
+ @Deprecated(forRemoval = true)
public boolean isNoOpBlock() {
- return isType(MetadataBlock.MetadataBlockType.NOOP);
+ return false;
Review Comment:
yes let's clean up the tests so we don't need to keep this
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java:
##########
@@ -188,4 +193,10 @@ private boolean isType(MetadataBlock.MetadataBlockType
type) {
MetadataBlock metadata = (MetadataBlock) _dataBlock;
return metadata.getType() == type;
}
+
+ @Override
+ public String toString() {
+ String blockType = isErrorBlock() ? "error" :
isSuccessfulEndOfStreamBlock() ? "eos" : "data";
+ return "TransferableBlock{blockType=" + blockType + ", _numRows=" +
_numRows + ", _container=" + _container + '}';
Review Comment:
:+1:
##########
pinot-core/src/main/java/org/apache/pinot/core/common/Operator.java:
##########
@@ -39,6 +39,7 @@ public interface Operator<T extends Block> {
* @throws EarlyTerminationException if the operator is early-terminated
(interrupted) before processing the next
* block of data. Operator can early terminated when the query times
out, or is already satisfied.
*/
+ @Nullable
Review Comment:
what's this annotation about? if this only affects v2 we should annotate on
the multistage base operator?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -101,9 +99,8 @@ public void init(PinotConfiguration config,
InstanceDataManager instanceDataMana
new NamedThreadFactory("query_intermediate_worker_on_" + _port +
"_port"));
_queryWorkerLeafExecutorService =
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
new NamedThreadFactory("query_leaf_worker_on_" + _port + "_port"));
- _scheduler = new OpChainSchedulerService(new
RoundRobinScheduler(releaseMs),
- getQueryWorkerIntermExecutorService());
- _mailboxService = new MailboxService(_hostname, _port, config,
_scheduler::onDataAvailable);
+ _scheduler = new
OpChainSchedulerService(getQueryWorkerIntermExecutorService());
Review Comment:
1 is enough for now. later we want to deal with mixed v1/v2 workload we
might want to reuse the same executor as the v1 leaf so that it is fair. but
for now this is good
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]