yashmayya commented on code in PR #14507:
URL: https://github.com/apache/pinot/pull/14507#discussion_r1894219555


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -368,6 +368,13 @@ public static class Broker {
     public static final String CONFIG_OF_INFER_PARTITION_HINT = 
"pinot.broker.multistage.infer.partition.hint";
     public static final boolean DEFAULT_INFER_PARTITION_HINT = false;
 
+    /**
+     * Whether to use spools in multistage query engine by default.
+     * This value can always be overridden by {@link 
Request.QueryOptionKey#USE_SPOOLS} query option
+     */
+    public static final String CONFIG_OF_SPOOLS = 
"pinot.broker.multistage.spools";

Review Comment:
   Should the config name include "default" to make it clear that it can still 
be overridden per query?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java:
##########
@@ -207,13 +207,17 @@ protected TransferableBlock getNextBlock()
       buildBroadcastHashTable();
     }
     if (_upstreamErrorBlock != null) {
+      LOGGER.trace("Returning upstream error block for join operator");

Review Comment:
   Are all the changes in this file unrelated to spools?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java:
##########
@@ -212,14 +215,22 @@ public StringBuilder visitMailboxSend(MailboxSendNode 
node, Context context) {
   private StringBuilder appendMailboxSend(MailboxSendNode node, Context 
context) {
     appendInfo(node, context);
 
-    int receiverStageId = node.getReceiverStageId();
-    List<MailboxInfo> receiverMailboxInfos =
-        
_dispatchableSubPlan.getQueryStageList().get(node.getStageId()).getWorkerMetadataList().get(context._workerId)
-            .getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
+    List<Stream<String>> perStageDescriptions = new ArrayList<>();
+    // This iterator is guaranteed to be sorted by stageId
+    for (Integer receiverStageId : node.getReceiverStageIds()) {
+      List<MailboxInfo> receiverMailboxInfos =
+          
_dispatchableSubPlan.getQueryStageList().get(node.getStageId()).getWorkerMetadataList().get(context._workerId)
+              .getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
+      // Sort to ensure print order
+      Stream<String> stageDescriptions = receiverMailboxInfos.stream()
+          .sorted(Comparator.comparingInt(MailboxInfo::getPort))
+          .map(v -> "[" + receiverStageId + "]@" + v);
+      perStageDescriptions.add(stageDescriptions);
+    }
     context._builder.append("->");
-    // Sort to ensure print order
-    String receivers = 
receiverMailboxInfos.stream().sorted(Comparator.comparingInt(MailboxInfo::getPort))
-        .map(v -> "[" + receiverStageId + "]@" + 
v).collect(Collectors.joining(",", "{", "}"));
+    String receivers = perStageDescriptions.stream()
+        .flatMap(Function.identity())
+        .collect(Collectors.joining(",", "{", "}"));

Review Comment:
   Looks like the format has changed slightly? Let's update the doc comment 
above this if yes.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -106,4 +106,9 @@ public boolean isEarlyTerminated() {
   public boolean isTerminated() {
     return _isTerminated;
   }
+
+  @Override
+  public String toString() {
+    return "m" + _id;

Review Comment:
   `g` and `m` might not be very easily understandable prefixes; could we use 
`grpc |` / `memory |` as prefixes to the mailbox ID instead?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java:
##########
@@ -117,8 +117,18 @@ private static MailboxReceiveNode 
deserializeMailboxReceiveNode(Plan.PlanNode pr
 
   private static MailboxSendNode deserializeMailboxSendNode(Plan.PlanNode 
protoNode) {
     Plan.MailboxSendNode protoMailboxSendNode = protoNode.getMailboxSendNode();
+
+    List<Integer> receiverIds;
+    List<Integer> protoReceiverIds = 
protoMailboxSendNode.getReceiverStageIdsList();
+    if (protoReceiverIds == null || protoReceiverIds.isEmpty()) {

Review Comment:
   The `null` case should never be possible right? Even in the case of an older 
broker that is serializing using the older proto message without this field, 
the default value will be populated here on deserialization which should be 
empty list for `repeated int32`.



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java:
##########
@@ -37,10 +40,7 @@
 
 
 public class DispatchablePlanVisitor implements PlanNodeVisitor<Void, 
DispatchablePlanContext> {
-  public static final DispatchablePlanVisitor INSTANCE = new 
DispatchablePlanVisitor();
-
-  private DispatchablePlanVisitor() {
-  }
+  private final Set<MailboxSendNode> _visited = Collections.newSetFromMap(new 
IdentityHashMap<>());

Review Comment:
   What are the changes in this class for?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java:
##########
@@ -484,6 +484,18 @@ default boolean defaultInferPartitionHint() {
       return CommonConstants.Broker.DEFAULT_INFER_PARTITION_HINT;
     }
 
+    /**
+     * Whether to use spools or not.
+     *
+     * This is treated as the default value for the broker and it is expected 
to be obtained from a Pinot configuration.
+     * This default value can be always overridden at query level by the query 
option
+     * {@link CommonConstants.Broker.Request.QueryOptionKey#USE_SPOOLS}.
+     */
+    @Value.Default
+    default boolean defaultUseSpools() {
+      return CommonConstants.Broker.DEFAULT_OF_SPOOLS;
+    }

Review Comment:
   This sounds like it should be configurable but is using a static default 
value? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to