yashmayya commented on code in PR #13733:
URL: https://github.com/apache/pinot/pull/13733#discussion_r1750199715
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java:
##########
@@ -56,115 +58,151 @@
/**
- * This visitor constructs a physical plan of operators from a {@link
PlanNode} tree. Note that
- * this works only for the intermediate stage nodes, leaf stage nodes are
expected to compile into
+ * A class used to transform PlanNodes (considered logical) into
MultiStageOperator (considered physical).
+ *
+ * Note that this works only for the intermediate stage nodes, leaf stage
nodes are expected to compile into
* v1 operators at this point in time.
*
- * <p>This class should be used statically via {@link #walkPlanNode(PlanNode,
OpChainExecutionContext)}
+ * <p><b>Notice</b>: Here <em>physical</em> is used in the context of
multi-stage engine, which means it transforms
+ * logical PlanNodes into MultiStageOperator.
+ * Probably another adjective should be used given physical means different
things for Calcite and single-stage</p>
*/
-public class PhysicalPlanVisitor implements
PlanNodeVisitor<MultiStageOperator, OpChainExecutionContext> {
+// TODO: rename as PhysicalPlaner or PlanNodeToOperator
+public class PhysicalPlanVisitor {
+
+ private PhysicalPlanVisitor() {
+ }
- private static final PhysicalPlanVisitor INSTANCE = new
PhysicalPlanVisitor();
+ public static OpChain planToOperators(PlanNode node, OpChainExecutionContext
context) {
+ return planToOperators(node, context, (planNode, operator) -> {
+ // Do nothing
+ });
+ }
- public static OpChain walkPlanNode(PlanNode node, OpChainExecutionContext
context) {
- MultiStageOperator root = node.visit(INSTANCE, context);
+ /**
+ * Like {@link #planToOperators(PlanNode, OpChainExecutionContext,
BiConsumer)} but keeps tracking of the original
+ * PlanNode that created each MultiStageOperator
+ * @param tracker a consumer that will be called each time a
MultiStageOperator is created.
+ * @return
+ */
+ public static OpChain planToOperators(PlanNode node, OpChainExecutionContext
context,
+ BiConsumer<PlanNode, MultiStageOperator> tracker) {
+ MyVisitor visitor = new MyVisitor(tracker);
+ MultiStageOperator root = node.visit(visitor, context);
+ tracker.accept(node, root);
return new OpChain(context, root);
}
- private <T extends PlanNode> MultiStageOperator visit(T node,
OpChainExecutionContext context) {
- if (context.getLeafStageContext() != null &&
context.getLeafStageContext().getLeafStageBoundaryNode() == node) {
- ServerPlanRequestContext leafStageContext =
context.getLeafStageContext();
- return new LeafStageTransferableBlockOperator(context,
leafStageContext.getServerQueryRequests(),
- leafStageContext.getLeafStageBoundaryNode().getDataSchema(),
leafStageContext.getLeafQueryExecutor(),
- leafStageContext.getExecutorService());
- } else {
- return node.visit(this, context);
+ private static class MyVisitor implements
PlanNodeVisitor<MultiStageOperator, OpChainExecutionContext> {
Review Comment:
Was this change done to accommodate the `PlanNode` <-> `MultiStageOperator`
tracking mechanism? Couldn't we just use a static factory method instead? Even
if not, `MyVisitor` doesn't really convey the purpose very well (although it's
doing what `PhysicalPlanVisitor` itself was doing earlier and since it's now an
inner class maybe we could just leave it at something like `Visitor`).
##########
pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java:
##########
@@ -691,4 +662,132 @@ private void addPrunerStats(InstanceResponseBlock
instanceResponse, SegmentPrune
instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(),
String.valueOf(prunerStats.getValuePruned()));
}
+
+ private List<IndexSegment> selectSegments(List<IndexSegment> indexSegments,
QueryContext queryContext,
+ TimerContext timerContext, ExecutorService executorService,
SegmentPrunerStatistics prunerStats) {
+ List<IndexSegment> selectedSegments;
+ if ((queryContext.getFilter() != null &&
queryContext.getFilter().isConstantFalse()) || (
+ queryContext.getHavingFilter() != null &&
queryContext.getHavingFilter().isConstantFalse())) {
+ selectedSegments = Collections.emptyList();
+ } else {
+ TimerContext.Timer segmentPruneTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
+ selectedSegments = _segmentPrunerService.prune(indexSegments,
queryContext, prunerStats, executorService);
+ segmentPruneTimer.stopAndRecord();
+ }
+ return selectedSegments;
+ }
+
+ private Plan planCombineQuery(QueryContext queryContext, TimerContext
timerContext, ExecutorService executorService,
+ @Nullable ResultsBlockStreamer streamer, List<SegmentContext>
selectedSegmentContexts) {
+ TimerContext.Timer planBuildTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
+
+ Plan queryPlan;
+ if (streamer != null) {
+ queryPlan =
_planMaker.makeStreamingInstancePlan(selectedSegmentContexts, queryContext,
executorService,
+ streamer, _serverMetrics);
+ } else {
+ queryPlan = _planMaker.makeInstancePlan(selectedSegmentContexts,
queryContext, executorService, _serverMetrics);
+ }
+ planBuildTimer.stopAndRecord();
+ return queryPlan;
+ }
+
+ private InstanceResponseBlock execute(TableDataManager tableDataManager,
+ List<IndexSegment> indexSegments, QueryContext queryContext,
TimerContext timerContext,
+ ExecutorService executorService, ResultsBlockStreamer streamer, boolean
enableStreaming,
+ List<IndexSegment> selectedSegments, List<SegmentContext>
selectedSegmentContexts)
Review Comment:
nit: couple of unused parameters here
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByAscOperator.java:
##########
@@ -77,6 +78,6 @@ public int getNumDocsScanned() {
@Override
protected String getExplainName() {
- return EXPLAIN_NAME;
+ return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL,
EXPLAIN_NAME);
Review Comment:
This will also change the v1 explain plan right? I guess we want to avoid
that?
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java:
##########
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.logical;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexWindowBound;
+import org.apache.calcite.rex.RexWindowBounds;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.pinot.common.utils.DatabaseUtils;
+import org.apache.pinot.core.operator.ExplainAttributeBuilder;
+import org.apache.pinot.core.plan.PinotExplainedRelNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.ExplainedNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Converts a {@link PlanNode} into a {@link RelNode}.
+ *
+ * This class is used to convert serialized plan nodes into RelNodes so they
can be used when explain with
+ * implementation is requested. Therefore some nodes may be transformed in a
way that loses information that is
+ * required to create an actual executable plan but not necessary in order to
describe the plan.
+ */
+public final class PlanNodeToRelConverter {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PlanNodeToRelConverter.class);
+
+ private PlanNodeToRelConverter() {
+ }
+
+ public static RelNode convert(RelBuilder builder, PlanNode planNode) {
+ ConverterVisitor visitor = new ConverterVisitor(builder);
+ planNode.visit(visitor, null);
+
+ return visitor.build();
+ }
+
+ private static class ConverterVisitor implements PlanNodeVisitor<Void, Void>
{
Review Comment:
Looks like the static inner class does all the heavy lifting here and the
outer class doesn't really have any logic -- why not simply make
`PlanNodeToRelConverter` itself implement the `PlanNodeVisitor` interface and
get rid of this static inner class?
##########
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextConverterUtils.java:
##########
@@ -155,11 +157,26 @@ public static QueryContext getQueryContext(PinotQuery
pinotQuery) {
}
}
+ ExplainMode explainMode;
+ if (!pinotQuery.isExplain()) {
+ explainMode = ExplainMode.NONE;
+ } else if (isUsingV1(pinotQuery)) {
+ explainMode = ExplainMode.NODE;
+ } else {
+ explainMode = ExplainMode.DESCRIPTION;
+ }
+
return new
QueryContext.Builder().setTableName(tableName).setSubquery(subquery)
.setSelectExpressions(selectExpressions).setDistinct(distinct).setAliasList(aliasList).setFilter(filter)
.setGroupByExpressions(groupByExpressions).setOrderByExpressions(orderByExpressions)
.setHavingFilter(havingFilter).setLimit(pinotQuery.getLimit()).setOffset(pinotQuery.getOffset())
.setQueryOptions(pinotQuery.getQueryOptions()).setExpressionOverrideHints(expressionContextOverrideHints)
- .setExplain(pinotQuery.isExplain()).build();
+ .setExplain(explainMode).build();
+ }
+
+ private static boolean isUsingV1(PinotQuery pinotQuery) {
Review Comment:
Shouldn't this be `isUsingV2` or `isUsingMultistageEngine`? Also we could
just use `QueryOptionUtils::isUseMultistageEngine` instead?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java:
##########
@@ -50,12 +51,17 @@ public ManagedChannel getChannel() {
}
public void submit(Worker.QueryRequest request, QueryServerInstance
virtualServer, Deadline deadline,
- Consumer<AsyncQueryDispatchResponse> callback) {
- _dispatchStub.withDeadline(deadline).submit(request, new
DispatchObserver(virtualServer, callback));
+ Consumer<AsyncResponse<Worker.QueryResponse>> callback) {
+ _dispatchStub.withDeadline(deadline).submit(request, new
LastValueDispatchObserver<>(virtualServer, callback));
}
public void cancel(long requestId) {
Worker.CancelRequest cancelRequest =
Worker.CancelRequest.newBuilder().setRequestId(requestId).build();
_dispatchStub.cancel(cancelRequest, NO_OP_CANCEL_STREAM_OBSERVER);
}
+
+ public void explain(Worker.QueryRequest request, QueryServerInstance
virtualServer, Deadline deadline,
+ Consumer<AsyncResponse<List<Worker.ExplainResponse>>> callback) {
+ _dispatchStub.withDeadline(deadline).explain(request, new
AllValuesDispatchObserver<>(virtualServer, callback));
Review Comment:
I didn't quite follow why we only care about the last value from the broker
<-> server stream response in the case of a regular query request and all
values from the stream in the case of an explain query request?
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java:
##########
@@ -86,7 +88,17 @@ public String toExplainString() {
}
protected String getExplainName() {
- return EXPLAIN_NAME;
+ return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL,
EXPLAIN_NAME);
Review Comment:
Same concern as above, this will also change the v1 explain plan?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java:
##########
@@ -56,115 +58,151 @@
/**
- * This visitor constructs a physical plan of operators from a {@link
PlanNode} tree. Note that
- * this works only for the intermediate stage nodes, leaf stage nodes are
expected to compile into
+ * A class used to transform PlanNodes (considered logical) into
MultiStageOperator (considered physical).
+ *
+ * Note that this works only for the intermediate stage nodes, leaf stage
nodes are expected to compile into
* v1 operators at this point in time.
*
- * <p>This class should be used statically via {@link #walkPlanNode(PlanNode,
OpChainExecutionContext)}
+ * <p><b>Notice</b>: Here <em>physical</em> is used in the context of
multi-stage engine, which means it transforms
+ * logical PlanNodes into MultiStageOperator.
+ * Probably another adjective should be used given physical means different
things for Calcite and single-stage</p>
*/
-public class PhysicalPlanVisitor implements
PlanNodeVisitor<MultiStageOperator, OpChainExecutionContext> {
+// TODO: rename as PhysicalPlaner or PlanNodeToOperator
+public class PhysicalPlanVisitor {
+
+ private PhysicalPlanVisitor() {
+ }
- private static final PhysicalPlanVisitor INSTANCE = new
PhysicalPlanVisitor();
+ public static OpChain planToOperators(PlanNode node, OpChainExecutionContext
context) {
+ return planToOperators(node, context, (planNode, operator) -> {
+ // Do nothing
+ });
+ }
- public static OpChain walkPlanNode(PlanNode node, OpChainExecutionContext
context) {
- MultiStageOperator root = node.visit(INSTANCE, context);
+ /**
+ * Like {@link #planToOperators(PlanNode, OpChainExecutionContext,
BiConsumer)} but keeps tracking of the original
+ * PlanNode that created each MultiStageOperator
+ * @param tracker a consumer that will be called each time a
MultiStageOperator is created.
+ * @return
+ */
+ public static OpChain planToOperators(PlanNode node, OpChainExecutionContext
context,
+ BiConsumer<PlanNode, MultiStageOperator> tracker) {
+ MyVisitor visitor = new MyVisitor(tracker);
+ MultiStageOperator root = node.visit(visitor, context);
+ tracker.accept(node, root);
return new OpChain(context, root);
}
- private <T extends PlanNode> MultiStageOperator visit(T node,
OpChainExecutionContext context) {
- if (context.getLeafStageContext() != null &&
context.getLeafStageContext().getLeafStageBoundaryNode() == node) {
- ServerPlanRequestContext leafStageContext =
context.getLeafStageContext();
- return new LeafStageTransferableBlockOperator(context,
leafStageContext.getServerQueryRequests(),
- leafStageContext.getLeafStageBoundaryNode().getDataSchema(),
leafStageContext.getLeafQueryExecutor(),
- leafStageContext.getExecutorService());
- } else {
- return node.visit(this, context);
+ private static class MyVisitor implements
PlanNodeVisitor<MultiStageOperator, OpChainExecutionContext> {
+ private final BiConsumer<PlanNode, MultiStageOperator> _tracker;
+
+ public MyVisitor(BiConsumer<PlanNode, MultiStageOperator> tracker) {
+ _tracker = tracker;
}
- }
- @Override
- public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node,
OpChainExecutionContext context) {
- if (node.isSort()) {
- return new SortedMailboxReceiveOperator(context, node);
- } else {
- return new MailboxReceiveOperator(context, node);
+ private <T extends PlanNode> MultiStageOperator visit(T node,
OpChainExecutionContext context) {
+ MultiStageOperator result;
+ if (context.getLeafStageContext() != null &&
context.getLeafStageContext().getLeafStageBoundaryNode() == node) {
+ ServerPlanRequestContext leafStageContext =
context.getLeafStageContext();
+ result = new LeafStageTransferableBlockOperator(context,
leafStageContext.getServerQueryRequests(),
+ leafStageContext.getLeafStageBoundaryNode().getDataSchema(),
leafStageContext.getLeafQueryExecutor(),
+ leafStageContext.getExecutorService());
+ } else {
+ result = node.visit(this, context);
+ }
+ _tracker.accept(node, result);
+ return result;
}
- }
- @Override
- public MultiStageOperator visitMailboxSend(MailboxSendNode node,
OpChainExecutionContext context) {
- return new MailboxSendOperator(context, visit(node.getInputs().get(0),
context), node);
- }
+ @Override
+ public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node,
OpChainExecutionContext context) {
+ if (node.isSort()) {
+ return new SortedMailboxReceiveOperator(context, node);
+ } else {
+ return new MailboxReceiveOperator(context, node);
+ }
+ }
- @Override
- public MultiStageOperator visitAggregate(AggregateNode node,
OpChainExecutionContext context) {
- return new AggregateOperator(context, visit(node.getInputs().get(0),
context), node);
- }
+ @Override
+ public MultiStageOperator visitMailboxSend(MailboxSendNode node,
OpChainExecutionContext context) {
+ return new MailboxSendOperator(context, visit(node.getInputs().get(0),
context), node);
+ }
- @Override
- public MultiStageOperator visitWindow(WindowNode node,
OpChainExecutionContext context) {
- PlanNode input = node.getInputs().get(0);
- return new WindowAggregateOperator(context, visit(input, context),
input.getDataSchema(), node);
- }
+ @Override
+ public MultiStageOperator visitAggregate(AggregateNode node,
OpChainExecutionContext context) {
+ return new AggregateOperator(context, visit(node.getInputs().get(0),
context), node);
+ }
- @Override
- public MultiStageOperator visitSetOp(SetOpNode setOpNode,
OpChainExecutionContext context) {
- List<MultiStageOperator> inputOperators = new
ArrayList<>(setOpNode.getInputs().size());
- for (PlanNode input : setOpNode.getInputs()) {
- inputOperators.add(visit(input, context));
+ @Override
+ public MultiStageOperator visitWindow(WindowNode node,
OpChainExecutionContext context) {
+ PlanNode input = node.getInputs().get(0);
+ return new WindowAggregateOperator(context, visit(input, context),
input.getDataSchema(), node);
}
- switch (setOpNode.getSetOpType()) {
- case UNION:
- return new UnionOperator(context, inputOperators,
setOpNode.getInputs().get(0).getDataSchema());
- case INTERSECT:
- return setOpNode.isAll() ? new IntersectAllOperator(context,
inputOperators,
- setOpNode.getInputs().get(0).getDataSchema())
- : new IntersectOperator(context, inputOperators,
setOpNode.getInputs().get(0).getDataSchema());
- case MINUS:
- return setOpNode.isAll() ? new MinusAllOperator(context,
inputOperators,
- setOpNode.getInputs().get(0).getDataSchema())
- : new MinusOperator(context, inputOperators,
setOpNode.getInputs().get(0).getDataSchema());
- default:
- throw new IllegalStateException("Unsupported SetOpType: " +
setOpNode.getSetOpType());
+
+ @Override
+ public MultiStageOperator visitSetOp(SetOpNode setOpNode,
OpChainExecutionContext context) {
+ List<MultiStageOperator> inputOperators = new
ArrayList<>(setOpNode.getInputs().size());
+ for (PlanNode input : setOpNode.getInputs()) {
+ inputOperators.add(visit(input, context));
+ }
+ switch (setOpNode.getSetOpType()) {
+ case UNION:
+ return new UnionOperator(context, inputOperators,
setOpNode.getInputs().get(0).getDataSchema());
+ case INTERSECT:
+ return setOpNode.isAll() ? new IntersectAllOperator(context,
inputOperators,
+ setOpNode.getInputs().get(0).getDataSchema())
+ : new IntersectOperator(context, inputOperators,
setOpNode.getInputs().get(0).getDataSchema());
+ case MINUS:
+ return setOpNode.isAll() ? new MinusAllOperator(context,
inputOperators,
+ setOpNode.getInputs().get(0).getDataSchema())
+ : new MinusOperator(context, inputOperators,
setOpNode.getInputs().get(0).getDataSchema());
+ default:
+ throw new IllegalStateException("Unsupported SetOpType: " +
setOpNode.getSetOpType());
+ }
}
- }
- @Override
- public MultiStageOperator visitExchange(ExchangeNode exchangeNode,
OpChainExecutionContext context) {
- throw new UnsupportedOperationException("ExchangeNode should not be
visited");
- }
+ @Override
+ public MultiStageOperator visitExchange(ExchangeNode exchangeNode,
OpChainExecutionContext context) {
+ throw new UnsupportedOperationException("ExchangeNode should not be
visited");
+ }
- @Override
- public MultiStageOperator visitFilter(FilterNode node,
OpChainExecutionContext context) {
- return new FilterOperator(context, visit(node.getInputs().get(0),
context), node);
- }
+ @Override
+ public MultiStageOperator visitFilter(FilterNode node,
OpChainExecutionContext context) {
+ return new FilterOperator(context, visit(node.getInputs().get(0),
context), node);
+ }
- @Override
- public MultiStageOperator visitJoin(JoinNode node, OpChainExecutionContext
context) {
- List<PlanNode> inputs = node.getInputs();
- PlanNode left = inputs.get(0);
- PlanNode right = inputs.get(1);
- return new HashJoinOperator(context, visit(left, context),
left.getDataSchema(), visit(right, context), node);
- }
+ @Override
+ public MultiStageOperator visitJoin(JoinNode node, OpChainExecutionContext
context) {
+ List<PlanNode> inputs = node.getInputs();
+ PlanNode left = inputs.get(0);
+ PlanNode right = inputs.get(1);
+ return new HashJoinOperator(context, visit(left, context),
left.getDataSchema(), visit(right, context), node);
+ }
- @Override
- public MultiStageOperator visitProject(ProjectNode node,
OpChainExecutionContext context) {
- PlanNode input = node.getInputs().get(0);
- return new TransformOperator(context, visit(input, context),
input.getDataSchema(), node);
- }
+ @Override
+ public MultiStageOperator visitProject(ProjectNode node,
OpChainExecutionContext context) {
+ PlanNode input = node.getInputs().get(0);
+ return new TransformOperator(context, visit(input, context),
input.getDataSchema(), node);
+ }
- @Override
- public MultiStageOperator visitSort(SortNode node, OpChainExecutionContext
context) {
- return new SortOperator(context, visit(node.getInputs().get(0), context),
node);
- }
+ @Override
+ public MultiStageOperator visitSort(SortNode node, OpChainExecutionContext
context) {
+ return new SortOperator(context, visit(node.getInputs().get(0),
context), node);
+ }
- @Override
- public MultiStageOperator visitTableScan(TableScanNode node,
OpChainExecutionContext context) {
- throw new UnsupportedOperationException("Stage node of type TableScanNode
is not supported!");
- }
+ @Override
+ public MultiStageOperator visitTableScan(TableScanNode node,
OpChainExecutionContext context) {
+ throw new UnsupportedOperationException("Stage node of type
TableScanNode is not supported!");
+ }
- @Override
- public MultiStageOperator visitValue(ValueNode node, OpChainExecutionContext
context) {
- return new LiteralValueOperator(context, node);
+ @Override
+ public MultiStageOperator visitValue(ValueNode node,
OpChainExecutionContext context) {
+ return new LiteralValueOperator(context, node);
+ }
+
+ @Override
+ public MultiStageOperator visitExplained(ExplainedNode node,
OpChainExecutionContext context) {
+ throw new UnsupportedOperationException("Stage node of type
ExplainedNode is not supported!");
Review Comment:
nit: I guess this is outdated since StageNode was renamed to PlanNode in
https://github.com/apache/pinot/pull/10735?
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java:
##########
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.logical;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexWindowBound;
+import org.apache.calcite.rex.RexWindowBounds;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.pinot.common.utils.DatabaseUtils;
+import org.apache.pinot.core.operator.ExplainAttributeBuilder;
+import org.apache.pinot.core.plan.PinotExplainedRelNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.ExplainedNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Converts a {@link PlanNode} into a {@link RelNode}.
+ *
+ * This class is used to convert serialized plan nodes into RelNodes so they
can be used when explain with
+ * implementation is requested. Therefore some nodes may be transformed in a
way that loses information that is
+ * required to create an actual executable plan but not necessary in order to
describe the plan.
+ */
+public final class PlanNodeToRelConverter {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PlanNodeToRelConverter.class);
+
+ private PlanNodeToRelConverter() {
+ }
+
+ public static RelNode convert(RelBuilder builder, PlanNode planNode) {
+ ConverterVisitor visitor = new ConverterVisitor(builder);
+ planNode.visit(visitor, null);
+
+ return visitor.build();
+ }
+
+ private static class ConverterVisitor implements PlanNodeVisitor<Void, Void>
{
+ private final RelBuilder _builder;
+
+ public ConverterVisitor(RelBuilder builder) {
+ _builder = builder;
+ }
+
+ private void visitChildren(PlanNode node) {
+ node.getInputs().forEach(input -> input.visit(this, null));
+ }
+
+ @Override
+ public Void visitAggregate(AggregateNode node, Void context) {
+ visitChildren(node);
+
+ try {
+ int[] groupKeyArr =
node.getGroupKeys().stream().mapToInt(Integer::intValue).toArray();
+ RelBuilder.GroupKey groupKey = _builder.groupKey(groupKeyArr);
+
+ List<RelBuilder.AggCall> aggCalls =
+ node.getAggCalls().stream().map(functionCall ->
RexExpressionUtils.toAggCall(_builder, functionCall))
+ .collect(Collectors.toList());
+
+ _builder.aggregate(groupKey, aggCalls);
+ } catch (RuntimeException e) {
+ LOGGER.warn("Failed to convert aggregate node: {}", node, e);
+ _builder.push(new PinotExplainedRelNode(_builder.getCluster(),
"UnknownAggregate", Collections.emptyMap(),
+ node.getDataSchema(), readAlreadyPushedChildren(node)));
+ }
Review Comment:
Wouldn't it be better to fail in these unknown error cases?
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java:
##########
@@ -28,27 +28,174 @@
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.calcite.avatica.util.ByteString;
+import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlNameMatchers;
+import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.NlsString;
import org.apache.calcite.util.Sarg;
+import org.apache.calcite.util.TimestampString;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.spi.utils.BooleanUtils;
import org.apache.pinot.spi.utils.ByteArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@SuppressWarnings({"rawtypes", "unchecked"})
public class RexExpressionUtils {
+ public static final Logger LOGGER =
LoggerFactory.getLogger(RexExpressionUtils.class);
+
private RexExpressionUtils() {
}
+ public static RexNode toRexNode(RelBuilder builder, RexExpression
rexExpression) {
+ if (rexExpression instanceof RexExpression.InputRef) {
+ return toRexInputRef(builder, (RexExpression.InputRef) rexExpression);
+ } else if (rexExpression instanceof RexExpression.Literal) {
+ return toRexLiteral(builder, (RexExpression.Literal) rexExpression);
+ } else if (rexExpression instanceof RexExpression.FunctionCall) {
+ return toRexCall(builder, (RexExpression.FunctionCall) rexExpression);
+ } else {
+ throw new IllegalArgumentException("Unsupported RexExpression type: " +
rexExpression.getClass().getName());
+ }
+ }
+
+ private static RexNode toRexInputRef(RelBuilder builder,
RexExpression.InputRef rexExpression) {
+ return builder.field(rexExpression.getIndex());
+ }
+
+ private static RexNode toRexCall(RelBuilder builder,
RexExpression.FunctionCall rexExpression) {
+ List<RexExpression> functionOperands = rexExpression.getFunctionOperands();
+ List<RexNode> operands = new ArrayList<>(functionOperands.size());
+ for (RexExpression functionOperand : functionOperands) {
+ operands.add(toRexNode(builder, functionOperand));
+ }
+ // TODO: This needs to be improved.
Review Comment:
What exactly needs to be improved / changed here? Also in `getAggFunction`.
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeSorter.java:
##########
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.explain;
+
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.TreeSet;
+import org.apache.pinot.common.proto.Plan;
+import org.apache.pinot.core.query.reduce.ExplainPlanDataTableReducer;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.ExplainedNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
+
+
+/**
+ * A utility class used to sort the plan nodes in a deterministic order.
+ *
+ * Any comparator can be passed to the sort method to sort the plan nodes,
although the default comparator
+ * is used to sort the plan nodes based on the type and the attributes of the
node.
+ */
+public class PlanNodeSorter {
Review Comment:
I didn't quite get the purpose of this plan node sorter utility. In which
cases would we get a non-deterministic order of plan nodes (necessitating this
plan node sorter)?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -256,4 +262,66 @@ private Map<String, String>
consolidateMetadata(Map<String, String> customProper
public void cancel(long requestId) {
_opChainScheduler.cancel(requestId);
}
+
+ public StagePlan explainQuery(
+ WorkerMetadata workerMetadata, StagePlan stagePlan, Map<String, String>
requestMetadata) {
+
+ if (!workerMetadata.isLeafStageWorker()) {
+ LOGGER.debug("Explain query on intermediate stages is a NOOP");
+ return stagePlan;
+ }
+ long requestId =
Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
+ long timeoutMs =
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
+ long deadlineMs = System.currentTimeMillis() + timeoutMs;
+
+ StageMetadata stageMetadata = stagePlan.getStageMetadata();
+ Map<String, String> opChainMetadata =
consolidateMetadata(stageMetadata.getCustomProperties(), requestMetadata);
+
+ if (PipelineBreakerExecutor.hasPipelineBreakers(stagePlan)) {
+ // TODO: Support pipeline breakers before merging this feature.
+ LOGGER.error("Pipeline breaker is not supported in explain query");
+ return stagePlan;
+ }
Review Comment:
I guess we're not going to support queries with pipeline breakers with this
new explain plan initially right? What do we want to show in those cases?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]