yashmayya commented on code in PR #13733:
URL: https://github.com/apache/pinot/pull/13733#discussion_r1750199715


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java:
##########
@@ -56,115 +58,151 @@
 
 
 /**
- * This visitor constructs a physical plan of operators from a {@link 
PlanNode} tree. Note that
- * this works only for the intermediate stage nodes, leaf stage nodes are 
expected to compile into
+ * A class used to transform PlanNodes (considered logical) into 
MultiStageOperator (considered physical).
+ *
+ * Note that this works only for the intermediate stage nodes, leaf stage 
nodes are expected to compile into
  * v1 operators at this point in time.
  *
- * <p>This class should be used statically via {@link #walkPlanNode(PlanNode, 
OpChainExecutionContext)}
+ * <p><b>Notice</b>: Here <em>physical</em> is used in the context of 
multi-stage engine, which means it transforms
+ * logical PlanNodes into MultiStageOperator.
+ * Probably another adjective should be used given physical means different 
things for Calcite and single-stage</p>
  */
-public class PhysicalPlanVisitor implements 
PlanNodeVisitor<MultiStageOperator, OpChainExecutionContext> {
+// TODO: rename as PhysicalPlaner or PlanNodeToOperator
+public class PhysicalPlanVisitor {
+
+  private PhysicalPlanVisitor() {
+  }
 
-  private static final PhysicalPlanVisitor INSTANCE = new 
PhysicalPlanVisitor();
+  public static OpChain planToOperators(PlanNode node, OpChainExecutionContext 
context) {
+    return planToOperators(node, context, (planNode, operator) -> {
+      // Do nothing
+    });
+  }
 
-  public static OpChain walkPlanNode(PlanNode node, OpChainExecutionContext 
context) {
-    MultiStageOperator root = node.visit(INSTANCE, context);
+  /**
+   * Like {@link #planToOperators(PlanNode, OpChainExecutionContext, 
BiConsumer)} but keeps tracking of the original
+   * PlanNode that created each MultiStageOperator
+   * @param tracker a consumer that will be called each time a 
MultiStageOperator is created.
+   * @return
+   */
+  public static OpChain planToOperators(PlanNode node, OpChainExecutionContext 
context,
+      BiConsumer<PlanNode, MultiStageOperator> tracker) {
+    MyVisitor visitor = new MyVisitor(tracker);
+    MultiStageOperator root = node.visit(visitor, context);
+    tracker.accept(node, root);
     return new OpChain(context, root);
   }
 
-  private <T extends PlanNode> MultiStageOperator visit(T node, 
OpChainExecutionContext context) {
-    if (context.getLeafStageContext() != null && 
context.getLeafStageContext().getLeafStageBoundaryNode() == node) {
-      ServerPlanRequestContext leafStageContext = 
context.getLeafStageContext();
-      return new LeafStageTransferableBlockOperator(context, 
leafStageContext.getServerQueryRequests(),
-          leafStageContext.getLeafStageBoundaryNode().getDataSchema(), 
leafStageContext.getLeafQueryExecutor(),
-          leafStageContext.getExecutorService());
-    } else {
-      return node.visit(this, context);
+  private static class MyVisitor implements 
PlanNodeVisitor<MultiStageOperator, OpChainExecutionContext> {

Review Comment:
   Was this change done to accommodate the `PlanNode` <-> `MultiStageOperator` 
tracking mechanism? Couldn't we just use a static factory method instead? Even 
if not, `MyVisitor` doesn't really convey the purpose very well (although it's 
doing what `PhysicalPlanVisitor` itself was doing earlier and since it's now an 
inner class maybe we could just leave it at something like `Visitor`).



##########
pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java:
##########
@@ -691,4 +662,132 @@ private void addPrunerStats(InstanceResponseBlock 
instanceResponse, SegmentPrune
     
instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(),
         String.valueOf(prunerStats.getValuePruned()));
   }
+
+  private List<IndexSegment> selectSegments(List<IndexSegment> indexSegments, 
QueryContext queryContext,
+      TimerContext timerContext, ExecutorService executorService, 
SegmentPrunerStatistics prunerStats) {
+    List<IndexSegment> selectedSegments;
+    if ((queryContext.getFilter() != null && 
queryContext.getFilter().isConstantFalse()) || (
+        queryContext.getHavingFilter() != null && 
queryContext.getHavingFilter().isConstantFalse())) {
+      selectedSegments = Collections.emptyList();
+    } else {
+      TimerContext.Timer segmentPruneTimer = 
timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
+      selectedSegments = _segmentPrunerService.prune(indexSegments, 
queryContext, prunerStats, executorService);
+      segmentPruneTimer.stopAndRecord();
+    }
+    return selectedSegments;
+  }
+
+  private Plan planCombineQuery(QueryContext queryContext, TimerContext 
timerContext, ExecutorService executorService,
+      @Nullable ResultsBlockStreamer streamer, List<SegmentContext> 
selectedSegmentContexts) {
+    TimerContext.Timer planBuildTimer = 
timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
+
+    Plan queryPlan;
+    if (streamer != null) {
+      queryPlan = 
_planMaker.makeStreamingInstancePlan(selectedSegmentContexts, queryContext, 
executorService,
+          streamer, _serverMetrics);
+    } else {
+      queryPlan = _planMaker.makeInstancePlan(selectedSegmentContexts, 
queryContext, executorService, _serverMetrics);
+    }
+    planBuildTimer.stopAndRecord();
+    return queryPlan;
+  }
+
+  private InstanceResponseBlock execute(TableDataManager tableDataManager,
+      List<IndexSegment> indexSegments, QueryContext queryContext, 
TimerContext timerContext,
+      ExecutorService executorService, ResultsBlockStreamer streamer, boolean 
enableStreaming,
+      List<IndexSegment> selectedSegments, List<SegmentContext> 
selectedSegmentContexts)

Review Comment:
   nit: couple of unused parameters here



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByAscOperator.java:
##########
@@ -77,6 +78,6 @@ public int getNumDocsScanned() {
 
   @Override
   protected String getExplainName() {
-    return EXPLAIN_NAME;
+    return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, 
EXPLAIN_NAME);

Review Comment:
   This will also change the v1 explain plan right? I guess we want to avoid 
that?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java:
##########
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.logical;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexWindowBound;
+import org.apache.calcite.rex.RexWindowBounds;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.pinot.common.utils.DatabaseUtils;
+import org.apache.pinot.core.operator.ExplainAttributeBuilder;
+import org.apache.pinot.core.plan.PinotExplainedRelNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.ExplainedNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Converts a {@link PlanNode} into a {@link RelNode}.
+ *
+ * This class is used to convert serialized plan nodes into RelNodes so they 
can be used when explain with
+ * implementation is requested. Therefore some nodes may be transformed in a 
way that loses information that is
+ * required to create an actual executable plan but not necessary in order to 
describe the plan.
+ */
+public final class PlanNodeToRelConverter {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PlanNodeToRelConverter.class);
+
+  private PlanNodeToRelConverter() {
+  }
+
+  public static RelNode convert(RelBuilder builder, PlanNode planNode) {
+    ConverterVisitor visitor = new ConverterVisitor(builder);
+    planNode.visit(visitor, null);
+
+    return visitor.build();
+  }
+
+  private static class ConverterVisitor implements PlanNodeVisitor<Void, Void> 
{

Review Comment:
   Looks like the static inner class does all the heavy lifting here and the 
outer class doesn't really have any logic -- why not simply make 
`PlanNodeToRelConverter` itself implement the `PlanNodeVisitor` interface and 
get rid of this static inner class?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextConverterUtils.java:
##########
@@ -155,11 +157,26 @@ public static QueryContext getQueryContext(PinotQuery 
pinotQuery) {
       }
     }
 
+    ExplainMode explainMode;
+    if (!pinotQuery.isExplain()) {
+      explainMode = ExplainMode.NONE;
+    } else if (isUsingV1(pinotQuery)) {
+      explainMode = ExplainMode.NODE;
+    } else {
+      explainMode = ExplainMode.DESCRIPTION;
+    }
+
     return new 
QueryContext.Builder().setTableName(tableName).setSubquery(subquery)
         
.setSelectExpressions(selectExpressions).setDistinct(distinct).setAliasList(aliasList).setFilter(filter)
         
.setGroupByExpressions(groupByExpressions).setOrderByExpressions(orderByExpressions)
         
.setHavingFilter(havingFilter).setLimit(pinotQuery.getLimit()).setOffset(pinotQuery.getOffset())
         
.setQueryOptions(pinotQuery.getQueryOptions()).setExpressionOverrideHints(expressionContextOverrideHints)
-        .setExplain(pinotQuery.isExplain()).build();
+        .setExplain(explainMode).build();
+  }
+
+  private static boolean isUsingV1(PinotQuery pinotQuery) {

Review Comment:
   Shouldn't this be `isUsingV2` or `isUsingMultistageEngine`? Also we could 
just use `QueryOptionUtils::isUseMultistageEngine` instead?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java:
##########
@@ -50,12 +51,17 @@ public ManagedChannel getChannel() {
   }
 
   public void submit(Worker.QueryRequest request, QueryServerInstance 
virtualServer, Deadline deadline,
-      Consumer<AsyncQueryDispatchResponse> callback) {
-    _dispatchStub.withDeadline(deadline).submit(request, new 
DispatchObserver(virtualServer, callback));
+      Consumer<AsyncResponse<Worker.QueryResponse>> callback) {
+    _dispatchStub.withDeadline(deadline).submit(request, new 
LastValueDispatchObserver<>(virtualServer, callback));
   }
 
   public void cancel(long requestId) {
     Worker.CancelRequest cancelRequest = 
Worker.CancelRequest.newBuilder().setRequestId(requestId).build();
     _dispatchStub.cancel(cancelRequest, NO_OP_CANCEL_STREAM_OBSERVER);
   }
+
+  public void explain(Worker.QueryRequest request, QueryServerInstance 
virtualServer, Deadline deadline,
+      Consumer<AsyncResponse<List<Worker.ExplainResponse>>> callback) {
+    _dispatchStub.withDeadline(deadline).explain(request, new 
AllValuesDispatchObserver<>(virtualServer, callback));

Review Comment:
   I didn't quite follow why we only care about the last value from the broker 
<-> server stream response in the case of a regular query request and all 
values from the stream in the case of an explain query request?



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java:
##########
@@ -86,7 +88,17 @@ public String toExplainString() {
   }
 
   protected String getExplainName() {
-    return EXPLAIN_NAME;
+    return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, 
EXPLAIN_NAME);

Review Comment:
   Same concern as above, this will also change the v1 explain plan?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java:
##########
@@ -56,115 +58,151 @@
 
 
 /**
- * This visitor constructs a physical plan of operators from a {@link 
PlanNode} tree. Note that
- * this works only for the intermediate stage nodes, leaf stage nodes are 
expected to compile into
+ * A class used to transform PlanNodes (considered logical) into 
MultiStageOperator (considered physical).
+ *
+ * Note that this works only for the intermediate stage nodes, leaf stage 
nodes are expected to compile into
  * v1 operators at this point in time.
  *
- * <p>This class should be used statically via {@link #walkPlanNode(PlanNode, 
OpChainExecutionContext)}
+ * <p><b>Notice</b>: Here <em>physical</em> is used in the context of 
multi-stage engine, which means it transforms
+ * logical PlanNodes into MultiStageOperator.
+ * Probably another adjective should be used given physical means different 
things for Calcite and single-stage</p>
  */
-public class PhysicalPlanVisitor implements 
PlanNodeVisitor<MultiStageOperator, OpChainExecutionContext> {
+// TODO: rename as PhysicalPlaner or PlanNodeToOperator
+public class PhysicalPlanVisitor {
+
+  private PhysicalPlanVisitor() {
+  }
 
-  private static final PhysicalPlanVisitor INSTANCE = new 
PhysicalPlanVisitor();
+  public static OpChain planToOperators(PlanNode node, OpChainExecutionContext 
context) {
+    return planToOperators(node, context, (planNode, operator) -> {
+      // Do nothing
+    });
+  }
 
-  public static OpChain walkPlanNode(PlanNode node, OpChainExecutionContext 
context) {
-    MultiStageOperator root = node.visit(INSTANCE, context);
+  /**
+   * Like {@link #planToOperators(PlanNode, OpChainExecutionContext, 
BiConsumer)} but keeps tracking of the original
+   * PlanNode that created each MultiStageOperator
+   * @param tracker a consumer that will be called each time a 
MultiStageOperator is created.
+   * @return
+   */
+  public static OpChain planToOperators(PlanNode node, OpChainExecutionContext 
context,
+      BiConsumer<PlanNode, MultiStageOperator> tracker) {
+    MyVisitor visitor = new MyVisitor(tracker);
+    MultiStageOperator root = node.visit(visitor, context);
+    tracker.accept(node, root);
     return new OpChain(context, root);
   }
 
-  private <T extends PlanNode> MultiStageOperator visit(T node, 
OpChainExecutionContext context) {
-    if (context.getLeafStageContext() != null && 
context.getLeafStageContext().getLeafStageBoundaryNode() == node) {
-      ServerPlanRequestContext leafStageContext = 
context.getLeafStageContext();
-      return new LeafStageTransferableBlockOperator(context, 
leafStageContext.getServerQueryRequests(),
-          leafStageContext.getLeafStageBoundaryNode().getDataSchema(), 
leafStageContext.getLeafQueryExecutor(),
-          leafStageContext.getExecutorService());
-    } else {
-      return node.visit(this, context);
+  private static class MyVisitor implements 
PlanNodeVisitor<MultiStageOperator, OpChainExecutionContext> {
+    private final BiConsumer<PlanNode, MultiStageOperator> _tracker;
+
+    public MyVisitor(BiConsumer<PlanNode, MultiStageOperator> tracker) {
+      _tracker = tracker;
     }
-  }
 
-  @Override
-  public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, 
OpChainExecutionContext context) {
-    if (node.isSort()) {
-      return new SortedMailboxReceiveOperator(context, node);
-    } else {
-      return new MailboxReceiveOperator(context, node);
+    private <T extends PlanNode> MultiStageOperator visit(T node, 
OpChainExecutionContext context) {
+      MultiStageOperator result;
+      if (context.getLeafStageContext() != null && 
context.getLeafStageContext().getLeafStageBoundaryNode() == node) {
+        ServerPlanRequestContext leafStageContext = 
context.getLeafStageContext();
+        result = new LeafStageTransferableBlockOperator(context, 
leafStageContext.getServerQueryRequests(),
+            leafStageContext.getLeafStageBoundaryNode().getDataSchema(), 
leafStageContext.getLeafQueryExecutor(),
+            leafStageContext.getExecutorService());
+      } else {
+        result = node.visit(this, context);
+      }
+      _tracker.accept(node, result);
+      return result;
     }
-  }
 
-  @Override
-  public MultiStageOperator visitMailboxSend(MailboxSendNode node, 
OpChainExecutionContext context) {
-    return new MailboxSendOperator(context, visit(node.getInputs().get(0), 
context), node);
-  }
+    @Override
+    public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, 
OpChainExecutionContext context) {
+      if (node.isSort()) {
+        return new SortedMailboxReceiveOperator(context, node);
+      } else {
+        return new MailboxReceiveOperator(context, node);
+      }
+    }
 
-  @Override
-  public MultiStageOperator visitAggregate(AggregateNode node, 
OpChainExecutionContext context) {
-    return new AggregateOperator(context, visit(node.getInputs().get(0), 
context), node);
-  }
+    @Override
+    public MultiStageOperator visitMailboxSend(MailboxSendNode node, 
OpChainExecutionContext context) {
+      return new MailboxSendOperator(context, visit(node.getInputs().get(0), 
context), node);
+    }
 
-  @Override
-  public MultiStageOperator visitWindow(WindowNode node, 
OpChainExecutionContext context) {
-    PlanNode input = node.getInputs().get(0);
-    return new WindowAggregateOperator(context, visit(input, context), 
input.getDataSchema(), node);
-  }
+    @Override
+    public MultiStageOperator visitAggregate(AggregateNode node, 
OpChainExecutionContext context) {
+      return new AggregateOperator(context, visit(node.getInputs().get(0), 
context), node);
+    }
 
-  @Override
-  public MultiStageOperator visitSetOp(SetOpNode setOpNode, 
OpChainExecutionContext context) {
-    List<MultiStageOperator> inputOperators = new 
ArrayList<>(setOpNode.getInputs().size());
-    for (PlanNode input : setOpNode.getInputs()) {
-      inputOperators.add(visit(input, context));
+    @Override
+    public MultiStageOperator visitWindow(WindowNode node, 
OpChainExecutionContext context) {
+      PlanNode input = node.getInputs().get(0);
+      return new WindowAggregateOperator(context, visit(input, context), 
input.getDataSchema(), node);
     }
-    switch (setOpNode.getSetOpType()) {
-      case UNION:
-        return new UnionOperator(context, inputOperators, 
setOpNode.getInputs().get(0).getDataSchema());
-      case INTERSECT:
-        return setOpNode.isAll() ? new IntersectAllOperator(context, 
inputOperators,
-            setOpNode.getInputs().get(0).getDataSchema())
-            : new IntersectOperator(context, inputOperators, 
setOpNode.getInputs().get(0).getDataSchema());
-      case MINUS:
-        return setOpNode.isAll() ? new MinusAllOperator(context, 
inputOperators,
-            setOpNode.getInputs().get(0).getDataSchema())
-            : new MinusOperator(context, inputOperators, 
setOpNode.getInputs().get(0).getDataSchema());
-      default:
-        throw new IllegalStateException("Unsupported SetOpType: " + 
setOpNode.getSetOpType());
+
+    @Override
+    public MultiStageOperator visitSetOp(SetOpNode setOpNode, 
OpChainExecutionContext context) {
+      List<MultiStageOperator> inputOperators = new 
ArrayList<>(setOpNode.getInputs().size());
+      for (PlanNode input : setOpNode.getInputs()) {
+        inputOperators.add(visit(input, context));
+      }
+      switch (setOpNode.getSetOpType()) {
+        case UNION:
+          return new UnionOperator(context, inputOperators, 
setOpNode.getInputs().get(0).getDataSchema());
+        case INTERSECT:
+          return setOpNode.isAll() ? new IntersectAllOperator(context, 
inputOperators,
+              setOpNode.getInputs().get(0).getDataSchema())
+              : new IntersectOperator(context, inputOperators, 
setOpNode.getInputs().get(0).getDataSchema());
+        case MINUS:
+          return setOpNode.isAll() ? new MinusAllOperator(context, 
inputOperators,
+              setOpNode.getInputs().get(0).getDataSchema())
+              : new MinusOperator(context, inputOperators, 
setOpNode.getInputs().get(0).getDataSchema());
+        default:
+          throw new IllegalStateException("Unsupported SetOpType: " + 
setOpNode.getSetOpType());
+      }
     }
-  }
 
-  @Override
-  public MultiStageOperator visitExchange(ExchangeNode exchangeNode, 
OpChainExecutionContext context) {
-    throw new UnsupportedOperationException("ExchangeNode should not be 
visited");
-  }
+    @Override
+    public MultiStageOperator visitExchange(ExchangeNode exchangeNode, 
OpChainExecutionContext context) {
+      throw new UnsupportedOperationException("ExchangeNode should not be 
visited");
+    }
 
-  @Override
-  public MultiStageOperator visitFilter(FilterNode node, 
OpChainExecutionContext context) {
-    return new FilterOperator(context, visit(node.getInputs().get(0), 
context), node);
-  }
+    @Override
+    public MultiStageOperator visitFilter(FilterNode node, 
OpChainExecutionContext context) {
+      return new FilterOperator(context, visit(node.getInputs().get(0), 
context), node);
+    }
 
-  @Override
-  public MultiStageOperator visitJoin(JoinNode node, OpChainExecutionContext 
context) {
-    List<PlanNode> inputs = node.getInputs();
-    PlanNode left = inputs.get(0);
-    PlanNode right = inputs.get(1);
-    return new HashJoinOperator(context, visit(left, context), 
left.getDataSchema(), visit(right, context), node);
-  }
+    @Override
+    public MultiStageOperator visitJoin(JoinNode node, OpChainExecutionContext 
context) {
+      List<PlanNode> inputs = node.getInputs();
+      PlanNode left = inputs.get(0);
+      PlanNode right = inputs.get(1);
+      return new HashJoinOperator(context, visit(left, context), 
left.getDataSchema(), visit(right, context), node);
+    }
 
-  @Override
-  public MultiStageOperator visitProject(ProjectNode node, 
OpChainExecutionContext context) {
-    PlanNode input = node.getInputs().get(0);
-    return new TransformOperator(context, visit(input, context), 
input.getDataSchema(), node);
-  }
+    @Override
+    public MultiStageOperator visitProject(ProjectNode node, 
OpChainExecutionContext context) {
+      PlanNode input = node.getInputs().get(0);
+      return new TransformOperator(context, visit(input, context), 
input.getDataSchema(), node);
+    }
 
-  @Override
-  public MultiStageOperator visitSort(SortNode node, OpChainExecutionContext 
context) {
-    return new SortOperator(context, visit(node.getInputs().get(0), context), 
node);
-  }
+    @Override
+    public MultiStageOperator visitSort(SortNode node, OpChainExecutionContext 
context) {
+      return new SortOperator(context, visit(node.getInputs().get(0), 
context), node);
+    }
 
-  @Override
-  public MultiStageOperator visitTableScan(TableScanNode node, 
OpChainExecutionContext context) {
-    throw new UnsupportedOperationException("Stage node of type TableScanNode 
is not supported!");
-  }
+    @Override
+    public MultiStageOperator visitTableScan(TableScanNode node, 
OpChainExecutionContext context) {
+      throw new UnsupportedOperationException("Stage node of type 
TableScanNode is not supported!");
+    }
 
-  @Override
-  public MultiStageOperator visitValue(ValueNode node, OpChainExecutionContext 
context) {
-    return new LiteralValueOperator(context, node);
+    @Override
+    public MultiStageOperator visitValue(ValueNode node, 
OpChainExecutionContext context) {
+      return new LiteralValueOperator(context, node);
+    }
+
+    @Override
+    public MultiStageOperator visitExplained(ExplainedNode node, 
OpChainExecutionContext context) {
+      throw new UnsupportedOperationException("Stage node of type 
ExplainedNode is not supported!");

Review Comment:
   nit: I guess this is outdated since StageNode was renamed to PlanNode in 
https://github.com/apache/pinot/pull/10735?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java:
##########
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.logical;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexWindowBound;
+import org.apache.calcite.rex.RexWindowBounds;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.pinot.common.utils.DatabaseUtils;
+import org.apache.pinot.core.operator.ExplainAttributeBuilder;
+import org.apache.pinot.core.plan.PinotExplainedRelNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.ExplainedNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Converts a {@link PlanNode} into a {@link RelNode}.
+ *
+ * This class is used to convert serialized plan nodes into RelNodes so they 
can be used when explain with
+ * implementation is requested. Therefore some nodes may be transformed in a 
way that loses information that is
+ * required to create an actual executable plan but not necessary in order to 
describe the plan.
+ */
+public final class PlanNodeToRelConverter {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PlanNodeToRelConverter.class);
+
+  private PlanNodeToRelConverter() {
+  }
+
+  public static RelNode convert(RelBuilder builder, PlanNode planNode) {
+    ConverterVisitor visitor = new ConverterVisitor(builder);
+    planNode.visit(visitor, null);
+
+    return visitor.build();
+  }
+
+  private static class ConverterVisitor implements PlanNodeVisitor<Void, Void> 
{
+    private final RelBuilder _builder;
+
+    public ConverterVisitor(RelBuilder builder) {
+      _builder = builder;
+    }
+
+    private void visitChildren(PlanNode node) {
+      node.getInputs().forEach(input -> input.visit(this, null));
+    }
+
+    @Override
+    public Void visitAggregate(AggregateNode node, Void context) {
+      visitChildren(node);
+
+      try {
+        int[] groupKeyArr = 
node.getGroupKeys().stream().mapToInt(Integer::intValue).toArray();
+        RelBuilder.GroupKey groupKey = _builder.groupKey(groupKeyArr);
+
+        List<RelBuilder.AggCall> aggCalls =
+            node.getAggCalls().stream().map(functionCall -> 
RexExpressionUtils.toAggCall(_builder, functionCall))
+                .collect(Collectors.toList());
+
+        _builder.aggregate(groupKey, aggCalls);
+      } catch (RuntimeException e) {
+        LOGGER.warn("Failed to convert aggregate node: {}", node, e);
+        _builder.push(new PinotExplainedRelNode(_builder.getCluster(), 
"UnknownAggregate", Collections.emptyMap(),
+            node.getDataSchema(), readAlreadyPushedChildren(node)));
+      }

Review Comment:
   Wouldn't it be better to fail in these unknown error cases?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java:
##########
@@ -28,27 +28,174 @@
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.calcite.avatica.util.ByteString;
+import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlNameMatchers;
+import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.util.NlsString;
 import org.apache.calcite.util.Sarg;
+import org.apache.calcite.util.TimestampString;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.spi.utils.BooleanUtils;
 import org.apache.pinot.spi.utils.ByteArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class RexExpressionUtils {
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(RexExpressionUtils.class);
+
   private RexExpressionUtils() {
   }
 
+  public static RexNode toRexNode(RelBuilder builder, RexExpression 
rexExpression) {
+    if (rexExpression instanceof RexExpression.InputRef) {
+      return toRexInputRef(builder, (RexExpression.InputRef) rexExpression);
+    } else if (rexExpression instanceof RexExpression.Literal) {
+      return toRexLiteral(builder, (RexExpression.Literal) rexExpression);
+    } else if (rexExpression instanceof RexExpression.FunctionCall) {
+      return toRexCall(builder, (RexExpression.FunctionCall) rexExpression);
+    } else {
+      throw new IllegalArgumentException("Unsupported RexExpression type: " + 
rexExpression.getClass().getName());
+    }
+  }
+
+  private static RexNode toRexInputRef(RelBuilder builder, 
RexExpression.InputRef rexExpression) {
+    return builder.field(rexExpression.getIndex());
+  }
+
+  private static RexNode toRexCall(RelBuilder builder, 
RexExpression.FunctionCall rexExpression) {
+    List<RexExpression> functionOperands = rexExpression.getFunctionOperands();
+    List<RexNode> operands = new ArrayList<>(functionOperands.size());
+    for (RexExpression functionOperand : functionOperands) {
+      operands.add(toRexNode(builder, functionOperand));
+    }
+    // TODO: This needs to be improved.

Review Comment:
   What exactly needs to be improved / changed here? Also in `getAggFunction`.



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeSorter.java:
##########
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.explain;
+
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.TreeSet;
+import org.apache.pinot.common.proto.Plan;
+import org.apache.pinot.core.query.reduce.ExplainPlanDataTableReducer;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.ExplainedNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
+
+
+/**
+ * A utility class used to sort the plan nodes in a deterministic order.
+ *
+ * Any comparator can be passed to the sort method to sort the plan nodes, 
although the default comparator
+ * is used to sort the plan nodes based on the type and the attributes of the 
node.
+ */
+public class PlanNodeSorter {

Review Comment:
   I didn't quite get the purpose of this plan node sorter utility. In which 
cases would we get a non-deterministic order of plan nodes (necessitating this 
plan node sorter)?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -256,4 +262,66 @@ private Map<String, String> 
consolidateMetadata(Map<String, String> customProper
   public void cancel(long requestId) {
     _opChainScheduler.cancel(requestId);
   }
+
+  public StagePlan explainQuery(
+      WorkerMetadata workerMetadata, StagePlan stagePlan, Map<String, String> 
requestMetadata) {
+
+    if (!workerMetadata.isLeafStageWorker()) {
+      LOGGER.debug("Explain query on intermediate stages is a NOOP");
+      return stagePlan;
+    }
+    long requestId = 
Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
+    long timeoutMs = 
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
+    long deadlineMs = System.currentTimeMillis() + timeoutMs;
+
+    StageMetadata stageMetadata = stagePlan.getStageMetadata();
+    Map<String, String> opChainMetadata = 
consolidateMetadata(stageMetadata.getCustomProperties(), requestMetadata);
+
+    if (PipelineBreakerExecutor.hasPipelineBreakers(stagePlan)) {
+      // TODO: Support pipeline breakers before merging this feature.
+      LOGGER.error("Pipeline breaker is not supported in explain query");
+      return stagePlan;
+    }

Review Comment:
   I guess we're not going to support queries with pipeline breakers with this 
new explain plan initially right? What do we want to show in those cases?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to