yashmayya commented on code in PR #14507: URL: https://github.com/apache/pinot/pull/14507#discussion_r1894219555
########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ########## @@ -368,6 +368,13 @@ public static class Broker { public static final String CONFIG_OF_INFER_PARTITION_HINT = "pinot.broker.multistage.infer.partition.hint"; public static final boolean DEFAULT_INFER_PARTITION_HINT = false; + /** + * Whether to use spools in multistage query engine by default. + * This value can always be overridden by {@link Request.QueryOptionKey#USE_SPOOLS} query option + */ + public static final String CONFIG_OF_SPOOLS = "pinot.broker.multistage.spools"; Review Comment: Should the config name include "default" to make it clear that it can still be overridden per query? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java: ########## @@ -207,13 +207,17 @@ protected TransferableBlock getNextBlock() buildBroadcastHashTable(); } if (_upstreamErrorBlock != null) { + LOGGER.trace("Returning upstream error block for join operator"); Review Comment: Are all the changes in this file unrelated to spools? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java: ########## @@ -212,14 +215,22 @@ public StringBuilder visitMailboxSend(MailboxSendNode node, Context context) { private StringBuilder appendMailboxSend(MailboxSendNode node, Context context) { appendInfo(node, context); - int receiverStageId = node.getReceiverStageId(); - List<MailboxInfo> receiverMailboxInfos = - _dispatchableSubPlan.getQueryStageList().get(node.getStageId()).getWorkerMetadataList().get(context._workerId) - .getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); + List<Stream<String>> perStageDescriptions = new ArrayList<>(); + // This iterator is guaranteed to be sorted by stageId + for (Integer receiverStageId : node.getReceiverStageIds()) { + List<MailboxInfo> receiverMailboxInfos = + _dispatchableSubPlan.getQueryStageList().get(node.getStageId()).getWorkerMetadataList().get(context._workerId) + .getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); + // Sort to ensure print order + Stream<String> stageDescriptions = receiverMailboxInfos.stream() + .sorted(Comparator.comparingInt(MailboxInfo::getPort)) + .map(v -> "[" + receiverStageId + "]@" + v); + perStageDescriptions.add(stageDescriptions); + } context._builder.append("->"); - // Sort to ensure print order - String receivers = receiverMailboxInfos.stream().sorted(Comparator.comparingInt(MailboxInfo::getPort)) - .map(v -> "[" + receiverStageId + "]@" + v).collect(Collectors.joining(",", "{", "}")); + String receivers = perStageDescriptions.stream() + .flatMap(Function.identity()) + .collect(Collectors.joining(",", "{", "}")); Review Comment: Looks like the format has changed slightly? Let's update the doc comment above this if yes. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java: ########## @@ -106,4 +106,9 @@ public boolean isEarlyTerminated() { public boolean isTerminated() { return _isTerminated; } + + @Override + public String toString() { + return "m" + _id; Review Comment: `g` and `m` might not be very easily understandable prefixes; could we use `grpc |` / `memory |` as prefixes to the mailbox ID instead? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java: ########## @@ -117,8 +117,18 @@ private static MailboxReceiveNode deserializeMailboxReceiveNode(Plan.PlanNode pr private static MailboxSendNode deserializeMailboxSendNode(Plan.PlanNode protoNode) { Plan.MailboxSendNode protoMailboxSendNode = protoNode.getMailboxSendNode(); + + List<Integer> receiverIds; + List<Integer> protoReceiverIds = protoMailboxSendNode.getReceiverStageIdsList(); + if (protoReceiverIds == null || protoReceiverIds.isEmpty()) { Review Comment: The `null` case should never be possible right? Even in the case of an older broker that is serializing using the older proto message without this field, the default value will be populated here on deserialization which should be empty list for `repeated int32`. ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java: ########## @@ -37,10 +40,7 @@ public class DispatchablePlanVisitor implements PlanNodeVisitor<Void, DispatchablePlanContext> { - public static final DispatchablePlanVisitor INSTANCE = new DispatchablePlanVisitor(); - - private DispatchablePlanVisitor() { - } + private final Set<MailboxSendNode> _visited = Collections.newSetFromMap(new IdentityHashMap<>()); Review Comment: What are the changes in this class for? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java: ########## @@ -484,6 +484,18 @@ default boolean defaultInferPartitionHint() { return CommonConstants.Broker.DEFAULT_INFER_PARTITION_HINT; } + /** + * Whether to use spools or not. + * + * This is treated as the default value for the broker and it is expected to be obtained from a Pinot configuration. + * This default value can be always overridden at query level by the query option + * {@link CommonConstants.Broker.Request.QueryOptionKey#USE_SPOOLS}. + */ + @Value.Default + default boolean defaultUseSpools() { + return CommonConstants.Broker.DEFAULT_OF_SPOOLS; + } Review Comment: This sounds like it should be configurable but is using a static default value? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org