yashmayya commented on code in PR #13733: URL: https://github.com/apache/pinot/pull/13733#discussion_r1741452894
########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ########## @@ -1192,6 +1209,9 @@ public enum WindowOverFlowMode { public static class PlanVersions { public static final int V1 = 1; } + + public static final String ASK_SERVERS_FOR_EXPLAIN_PLAN = "pinot.query.explain.ask.servers"; Review Comment: I think we should include an indication that this config is only for the multi-stage engine? Also I'm wondering if users will be able to make sense of "ask servers" without extensively consulting the documentation. What do you think about `include.physical` (or something similar) instead of `ask.servers`? ########## pinot-common/src/main/proto/plan.proto: ########## @@ -193,3 +194,29 @@ message WindowNode { int32 upperBound = 6; repeated Literal constants = 7; } + +message ExplainNode { + string type = 1; + map<string, AttributeValue> attributes = 2; + + message AttributeValue { + MergeType mergeType = 1; + + oneof value { + string string = 2; + int64 long = 3; + bool bool = 4; + string json = 5; Review Comment: Why is this called `json`? It seems like we're using this field to store collections of strings serialized into a byte array via Jackson's serializer for some reason I didn't quite follow. ########## pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java: ########## @@ -32,6 +32,10 @@ public class DatabaseUtils { private DatabaseUtils() { } + public static String[] splitTableName(String tableName) { Review Comment: nit: can we have a small Javadoc demonstrating intended usage through an example? IIUC this is to convert `databaseName.tableName` to [`databaseName`, `tableName`] although I don't fully follow why this is needed or used in the `RelBuilder` call. ########## pinot-common/src/main/proto/plan.proto: ########## @@ -193,3 +194,29 @@ message WindowNode { int32 upperBound = 6; repeated Literal constants = 7; } + +message ExplainNode { + string type = 1; + map<string, AttributeValue> attributes = 2; + + message AttributeValue { + MergeType mergeType = 1; + + oneof value { + string string = 2; + int64 long = 3; + bool bool = 4; + string json = 5; + }; + + enum MergeType { + // Longs will be added, other types will behave like IDEMPOTENT. + DEFAULT = 0; + // When being merge, values must be the same. Otherwise cannot be merged. + IDEMPOTENT = 1; + // When being merge, if values are different, the value can be ignored. Review Comment: Can we also include a comment on what "ignore" means in this context - i.e., what will be the result of merging two explain nodes that have an attribute value of merge type `IGNORABLE` with different values? ########## pinot-core/src/main/java/org/apache/pinot/core/plan/PinotExplainedRelNode.java: ########## @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.plan; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.AbstractRelNode; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeFieldImpl; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.rel.type.StructKind; +import org.apache.pinot.common.proto.Plan; +import org.apache.pinot.common.utils.DataSchema; + + +/** + * This class is a hackish way to adapt Pinot explain plan into Calcite explain plan. + * + * While in Calcite both logical and physical operators are represented by classes that extend RelNode, in multi-stage + * Pinot plans we first optimize {@link RelNode RelNodes} at logical level in the broker and then we transform send + * these RelNodes to the server (using {@code org.apache.pinot.query.planner.plannode.PlanNode} as intermediate format). + * + * Servers then extract leaf nodes from the stage, trying to execute as much as possible in a leaf operator. In that + * leaf operator is where nodes are compiled into single-stage physical operators + * (aka {@link org.apache.pinot.core.operator.BaseOperator}), which include important information like whether indexes + * are being used or not. + * + * {@link PinotExplainedRelNode} is a way to represent these single-stage operators in Calcite. + * <b>They are not meant to be executed</b>. + * Instead, when physical explain is required, the broker ask for the final plan to each server. They return + * a PlanNode and the broker then converts these PlanNodes into a PinotExplainedRelNode. The logical plan is then + * modified to substitute the nodes that were converted into single-stage physical plans with the corresponding + * PinotExplainedRelNode. + */ +public class PinotExplainedRelNode extends AbstractRelNode { + + /** + * The name of this node, whose role is like a title in the explain plan. + */ + private final String _type; + private final Map<String, Plan.ExplainNode.AttributeValue> _attributes; + private final List<RelNode> _inputs; + private final DataSchema _dataSchema; + + public PinotExplainedRelNode(RelOptCluster cluster, RelTraitSet traitSet, String type, Review Comment: Also looks like most (all) call-sites are passing in an empty trait set, maybe we can add an additional constructor that uses an empty trait set by default? ########## pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java: ########## @@ -528,7 +538,16 @@ public Builder setExpressionOverrideHints(Map<ExpressionContext, ExpressionConte return this; } + /** + * @deprecated Use {@link #setExplain(ExplainMode)} instead. + */ + @Deprecated Review Comment: Isn't this an internal class / API? Can't we directly remove after migrating all usages instead of deprecating? Seems like unnecessary cruft to leave around unless I'm missing something regarding the usage of `QueryContext`. ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java: ########## @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.util.List; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.util.TestUtils; +import org.intellij.lang.annotations.Language; +import org.testcontainers.shaded.org.apache.commons.io.FileUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class MultiStageEngineExplainIntegrationTest extends BaseClusterIntegrationTest { + private static final String SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema"; + private static final String DEFAULT_DATABASE_NAME = CommonConstants.DEFAULT_DATABASE; + private static final String DATABASE_NAME = "db1"; + private static final String TABLE_NAME_WITH_DATABASE = DATABASE_NAME + "." + DEFAULT_TABLE_NAME; + private String _tableName = DEFAULT_TABLE_NAME; + + @Override + protected String getSchemaFileName() { + return SCHEMA_FILE_NAME; + } Review Comment: Isn't this just using the default schema? ########## pinot-common/src/main/proto/plan.proto: ########## @@ -193,3 +194,29 @@ message WindowNode { int32 upperBound = 6; repeated Literal constants = 7; } + +message ExplainNode { + string type = 1; Review Comment: Answering my own question but after reviewing more parts of this PR, it doesn't seem to make sense to make it an enum based on the sheer number of different possible types. ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java: ########## @@ -248,6 +249,11 @@ public StringBuilder visitValue(ValueNode node, Context context) { return appendInfo(node, context); } + @Override + public StringBuilder visitExplained(ExplainedNode node, Context context) { + return appendInfo(node, context); Review Comment: We should never see an `ExplainedNode` in a "physical" explain plan if I understand correctly? ########## pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java: ########## @@ -196,27 +201,135 @@ public int hashCode() { return EqualityUtils.hashCodeOf(Arrays.hashCode(_columnNames), Arrays.hashCode(_columnDataTypes)); } + public RelDataType toRelDataType(RelDataTypeFactory typeFactory) { + List<RelDataType> columnTypes = new ArrayList<>(_columnDataTypes.length); + for (ColumnDataType columnDataType : _columnDataTypes) { + columnTypes.add(columnDataType.toType(typeFactory)); + } + return typeFactory.createStructType(columnTypes, Arrays.asList(_columnNames)); + } + public enum ColumnDataType { - INT(NullValuePlaceHolder.INT), - LONG(NullValuePlaceHolder.LONG), - FLOAT(NullValuePlaceHolder.FLOAT), - DOUBLE(NullValuePlaceHolder.DOUBLE), - BIG_DECIMAL(NullValuePlaceHolder.BIG_DECIMAL), - BOOLEAN(INT, NullValuePlaceHolder.INT), - TIMESTAMP(LONG, NullValuePlaceHolder.LONG), - STRING(NullValuePlaceHolder.STRING), - JSON(STRING, NullValuePlaceHolder.STRING), - BYTES(NullValuePlaceHolder.INTERNAL_BYTES), - OBJECT(null), - INT_ARRAY(NullValuePlaceHolder.INT_ARRAY), - LONG_ARRAY(NullValuePlaceHolder.LONG_ARRAY), - FLOAT_ARRAY(NullValuePlaceHolder.FLOAT_ARRAY), - DOUBLE_ARRAY(NullValuePlaceHolder.DOUBLE_ARRAY), - BOOLEAN_ARRAY(INT_ARRAY, NullValuePlaceHolder.INT_ARRAY), - TIMESTAMP_ARRAY(LONG_ARRAY, NullValuePlaceHolder.LONG_ARRAY), - STRING_ARRAY(NullValuePlaceHolder.STRING_ARRAY), - BYTES_ARRAY(NullValuePlaceHolder.BYTES_ARRAY), - UNKNOWN(null); + INT(NullValuePlaceHolder.INT) { + @Override + public RelDataType toType(RelDataTypeFactory typeFactory) { + return typeFactory.createSqlType(SqlTypeName.INTEGER); + } + }, + LONG(NullValuePlaceHolder.LONG) { + @Override + public RelDataType toType(RelDataTypeFactory typeFactory) { + return typeFactory.createSqlType(SqlTypeName.BIGINT); + } + }, + FLOAT(NullValuePlaceHolder.FLOAT) { + @Override + public RelDataType toType(RelDataTypeFactory typeFactory) { + return typeFactory.createSqlType(SqlTypeName.FLOAT); Review Comment: Calcite `FLOAT` is 8 byte floating points - i.e., same as `DOUBLE`. Pinot's `FLOAT` type maps to Calcite's `REAL` type which is 4 byte floating points. See https://calcite.apache.org/docs/reference.html#scalar-types and https://github.com/apache/pinot/blob/3f324a4f0bfdecb1b14e459deec647a2ff2f734d/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java#L75-L76 I think ideally we'd want to unify the `FieldSpec.DataType` -> `RelDataType` and `DataSchema.ColumnDataType` -> `RelDataType` conversions but I'm not sure there's an easy / clean way to do so with how things are currently structured. ########## pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainV2ResultBlock.java: ########## @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.blocks.results; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.datatable.DataTableBuilder; +import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; +import org.apache.pinot.core.plan.ExplainInfo; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.spi.utils.JsonUtils; + + +public class ExplainV2ResultBlock extends BaseResultsBlock { + + private final QueryContext _queryContext; + private final List<ExplainInfo> _physicalPlan; + + public static final DataSchema EXPLAIN_RESULT_SCHEMA = + new DataSchema(new String[]{"Plan"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}); + + public ExplainV2ResultBlock(QueryContext queryContext, ExplainInfo physicalPlan) { + this(queryContext, Lists.newArrayList(physicalPlan)); + } + + public ExplainV2ResultBlock(QueryContext queryContext, List<ExplainInfo> physicalPlan) { + _queryContext = queryContext; + _physicalPlan = physicalPlan; + } + + @Override + public int getNumRows() { + return _physicalPlan.size(); + } + + @Nullable + @Override + public QueryContext getQueryContext() { + return _queryContext; + } + + @Nullable + @Override + public DataSchema getDataSchema() { + return DataSchema.EXPLAIN_RESULT_SCHEMA; + } + + @Nullable + @Override + public List<Object[]> getRows() { + List<Object[]> rows = new ArrayList<>(_physicalPlan.size()); + try { + for (ExplainInfo node : _physicalPlan) { + rows.add(new Object[]{JsonUtils.objectToString(node)}); + } + return rows; + } catch (JsonProcessingException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public DataTable getDataTable() + throws IOException { + DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA); Review Comment: Same issue as above? ########## pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java: ########## @@ -46,4 +50,25 @@ public final T nextBlock() { // Make it protected because we should always call nextBlock() protected abstract T getNextBlock(); + + @Override + public ExplainInfo getExplainInfo() { + ExplainAttributeBuilder attributeBuilder = new ExplainAttributeBuilder(); + explainAttributes(attributeBuilder); + return new ExplainInfo(getExplainName(), attributeBuilder.build(), getChildrenExplainInfo()); + } + + protected List<ExplainInfo> getChildrenExplainInfo() { + return getChildOperators().stream() + .filter(Objects::nonNull) + .map(Operator::getExplainInfo) + .collect(Collectors.toList()); + } + + protected String getExplainName() { Review Comment: I think it'd be useful to have a Javadoc here to elaborate how this is different from `toExplainString`. ########## pinot-core/src/main/java/org/apache/pinot/core/common/Operator.java: ########## @@ -37,23 +39,55 @@ public interface Operator<T extends Block> { * times, and will return non-empty block or null if no more documents available * * @throws EarlyTerminationException if the operator is early-terminated (interrupted) before processing the next - * block of data. Operator can early terminated when the query times out, or is already satisfied. + * block of data. Operator can early be terminated when the query times out, or is already satisfied. Review Comment: ```suggestion * block of data. Operator can be early terminated when the query times out, or is already satisfied. ``` ########## pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainV2ResultBlock.java: ########## @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.blocks.results; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.datatable.DataTableBuilder; +import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; +import org.apache.pinot.core.plan.ExplainInfo; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.spi.utils.JsonUtils; + + +public class ExplainV2ResultBlock extends BaseResultsBlock { + + private final QueryContext _queryContext; + private final List<ExplainInfo> _physicalPlan; + + public static final DataSchema EXPLAIN_RESULT_SCHEMA = + new DataSchema(new String[]{"Plan"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}); + + public ExplainV2ResultBlock(QueryContext queryContext, ExplainInfo physicalPlan) { + this(queryContext, Lists.newArrayList(physicalPlan)); + } + + public ExplainV2ResultBlock(QueryContext queryContext, List<ExplainInfo> physicalPlan) { + _queryContext = queryContext; + _physicalPlan = physicalPlan; + } + + @Override + public int getNumRows() { + return _physicalPlan.size(); + } + + @Nullable + @Override + public QueryContext getQueryContext() { + return _queryContext; + } + + @Nullable + @Override + public DataSchema getDataSchema() { + return DataSchema.EXPLAIN_RESULT_SCHEMA; Review Comment: ```suggestion return EXPLAIN_RESULT_SCHEMA; ``` I think this is accidentally using the wrong result schema? ########## pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java: ########## @@ -100,4 +103,21 @@ public String toExplainString() { return stringBuilder.append(')').toString(); } + + @Override + protected String getExplainName() { + return EXPLAIN_NAME; + } + + @Override + protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) { + super.explainAttributes(attributeBuilder); + if (_aggregationFunctions.length == 0) { + return; + } + List<String> aggregations = Arrays.stream(_aggregationFunctions) + .map(AggregationFunction::toExplainString) + .collect(Collectors.toList()); + attributeBuilder.putLongIdempotent("numAggregations", aggregations.size()); Review Comment: This seems odd - we're collecting the explain strings of all the aggregation functions but then only including the number of aggregations? ########## pinot-common/src/main/proto/plan.proto: ########## @@ -193,3 +194,29 @@ message WindowNode { int32 upperBound = 6; repeated Literal constants = 7; } + +message ExplainNode { + string type = 1; Review Comment: Is it not possible for this to be an enum? ########## pinot-core/src/main/java/org/apache/pinot/core/plan/PinotExplainedRelNode.java: ########## @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.plan; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.AbstractRelNode; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeFieldImpl; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.rel.type.StructKind; +import org.apache.pinot.common.proto.Plan; +import org.apache.pinot.common.utils.DataSchema; + + +/** + * This class is a hackish way to adapt Pinot explain plan into Calcite explain plan. + * + * While in Calcite both logical and physical operators are represented by classes that extend RelNode, in multi-stage + * Pinot plans we first optimize {@link RelNode RelNodes} at logical level in the broker and then we transform send + * these RelNodes to the server (using {@code org.apache.pinot.query.planner.plannode.PlanNode} as intermediate format). + * + * Servers then extract leaf nodes from the stage, trying to execute as much as possible in a leaf operator. In that + * leaf operator is where nodes are compiled into single-stage physical operators + * (aka {@link org.apache.pinot.core.operator.BaseOperator}), which include important information like whether indexes + * are being used or not. + * + * {@link PinotExplainedRelNode} is a way to represent these single-stage operators in Calcite. + * <b>They are not meant to be executed</b>. + * Instead, when physical explain is required, the broker ask for the final plan to each server. They return + * a PlanNode and the broker then converts these PlanNodes into a PinotExplainedRelNode. The logical plan is then + * modified to substitute the nodes that were converted into single-stage physical plans with the corresponding + * PinotExplainedRelNode. + */ +public class PinotExplainedRelNode extends AbstractRelNode { + + /** + * The name of this node, whose role is like a title in the explain plan. + */ + private final String _type; + private final Map<String, Plan.ExplainNode.AttributeValue> _attributes; + private final List<RelNode> _inputs; + private final DataSchema _dataSchema; + + public PinotExplainedRelNode(RelOptCluster cluster, RelTraitSet traitSet, String type, + Map<String, Plan.ExplainNode.AttributeValue> attributes, DataSchema dataSchema, RelNode input) { + this(cluster, traitSet, type, attributes, dataSchema, Lists.newArrayList(input)); + } + + public PinotExplainedRelNode(RelOptCluster cluster, RelTraitSet traitSet, String type, + Map<String, Plan.ExplainNode.AttributeValue> attributes, DataSchema dataSchema, List<? extends RelNode> inputs) { + super(cluster, traitSet); + _type = type; + _attributes = attributes; + _inputs = new ArrayList<>(inputs); + _dataSchema = dataSchema; + } + + @Override + protected RelDataType deriveRowType() { + int size = _dataSchema.size(); + List<RelDataTypeField> fields = new ArrayList<>(size); + RelDataTypeFactory typeFactory = getCluster().getTypeFactory(); + for (int i = 0; i < size; i++) { + String columnName = _dataSchema.getColumnName(i); + + DataSchema.ColumnDataType columnDataType = _dataSchema.getColumnDataType(i); + RelDataType type = columnDataType.toType(typeFactory); + + fields.add(new RelDataTypeFieldImpl(columnName, i, type)); + } + return new RelRecordType(StructKind.FULLY_QUALIFIED, fields, false); Review Comment: nit: looks like this is the default `StructKind` and nullability used in `RelRecordType` so we can simplify this? ```suggestion return new RelRecordType(fields); ``` ########## pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java: ########## @@ -100,6 +100,17 @@ public String toExplainString() { return stringBuilder.append(')').toString(); } + @Override + protected String getExplainName() { + return EXPLAIN_NAME; + } + + @Override + protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) { + super.explainAttributes(attributeBuilder); Review Comment: Unnecessary `super` call? ########## pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java: ########## @@ -230,4 +232,26 @@ public String toExplainString() { return stringBuilder.append(')').toString(); } + + @Override + protected String getExplainName() { + return EXPLAIN_NAME; + } + + @Override + protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) { + super.explainAttributes(attributeBuilder); + if (_groupByExpressions.length > 0) { + List<String> groupKeys = Arrays.stream(_groupByExpressions) + .map(ExpressionContext::toString) + .collect(Collectors.toList()); + attributeBuilder.putJson("groupKeys", groupKeys); + } + if (_aggregationFunctions.length > 0) { + List<String> aggregations = Arrays.stream(_aggregationFunctions) + .map(AggregationFunction::toExplainString) + .collect(Collectors.toList()); + attributeBuilder.putJson("aggregations", aggregations); + } Review Comment: Can't comment on that file directly but we should add this to `FilteredAggregationOperator` as well. ########## pinot-core/src/main/java/org/apache/pinot/core/plan/PinotExplainedRelNode.java: ########## @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.plan; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.AbstractRelNode; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeFieldImpl; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.rel.type.StructKind; +import org.apache.pinot.common.proto.Plan; +import org.apache.pinot.common.utils.DataSchema; + + +/** + * This class is a hackish way to adapt Pinot explain plan into Calcite explain plan. + * + * While in Calcite both logical and physical operators are represented by classes that extend RelNode, in multi-stage + * Pinot plans we first optimize {@link RelNode RelNodes} at logical level in the broker and then we transform send + * these RelNodes to the server (using {@code org.apache.pinot.query.planner.plannode.PlanNode} as intermediate format). + * + * Servers then extract leaf nodes from the stage, trying to execute as much as possible in a leaf operator. In that + * leaf operator is where nodes are compiled into single-stage physical operators + * (aka {@link org.apache.pinot.core.operator.BaseOperator}), which include important information like whether indexes + * are being used or not. + * + * {@link PinotExplainedRelNode} is a way to represent these single-stage operators in Calcite. + * <b>They are not meant to be executed</b>. + * Instead, when physical explain is required, the broker ask for the final plan to each server. They return + * a PlanNode and the broker then converts these PlanNodes into a PinotExplainedRelNode. The logical plan is then + * modified to substitute the nodes that were converted into single-stage physical plans with the corresponding + * PinotExplainedRelNode. + */ +public class PinotExplainedRelNode extends AbstractRelNode { + + /** + * The name of this node, whose role is like a title in the explain plan. + */ + private final String _type; + private final Map<String, Plan.ExplainNode.AttributeValue> _attributes; + private final List<RelNode> _inputs; + private final DataSchema _dataSchema; + + public PinotExplainedRelNode(RelOptCluster cluster, RelTraitSet traitSet, String type, + Map<String, Plan.ExplainNode.AttributeValue> attributes, DataSchema dataSchema, RelNode input) { + this(cluster, traitSet, type, attributes, dataSchema, Lists.newArrayList(input)); + } + + public PinotExplainedRelNode(RelOptCluster cluster, RelTraitSet traitSet, String type, + Map<String, Plan.ExplainNode.AttributeValue> attributes, DataSchema dataSchema, List<? extends RelNode> inputs) { + super(cluster, traitSet); + _type = type; + _attributes = attributes; + _inputs = new ArrayList<>(inputs); + _dataSchema = dataSchema; + } + + @Override + protected RelDataType deriveRowType() { + int size = _dataSchema.size(); + List<RelDataTypeField> fields = new ArrayList<>(size); + RelDataTypeFactory typeFactory = getCluster().getTypeFactory(); + for (int i = 0; i < size; i++) { + String columnName = _dataSchema.getColumnName(i); + + DataSchema.ColumnDataType columnDataType = _dataSchema.getColumnDataType(i); + RelDataType type = columnDataType.toType(typeFactory); + + fields.add(new RelDataTypeFieldImpl(columnName, i, type)); + } + return new RelRecordType(StructKind.FULLY_QUALIFIED, fields, false); + } + + @Override + public List<RelNode> getInputs() { + return Collections.unmodifiableList(_inputs); + } + + @Override + public void replaceInput(int ordinalInParent, RelNode p) { + _inputs.set(ordinalInParent, p); + } + + @Override + public String getRelTypeName() { + return _type; + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + RelWriter relWriter = super.explainTerms(pw); + int inputSize = getInputs().size(); + if (inputSize == 1) { + relWriter.input("input", getInputs().get(0)); + } else if (inputSize > 1) { + for (int i = 0; i < inputSize; i++) { + relWriter.input("input#" + i, getInputs().get(i)); + } + } Review Comment: Where is this input term string being used? Does it matter if for a single input node we use `input#0` instead of `input`? ########## pinot-core/src/main/java/org/apache/pinot/core/plan/PinotExplainedRelNode.java: ########## @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.plan; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.AbstractRelNode; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeFieldImpl; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.rel.type.StructKind; +import org.apache.pinot.common.proto.Plan; +import org.apache.pinot.common.utils.DataSchema; + + +/** + * This class is a hackish way to adapt Pinot explain plan into Calcite explain plan. + * + * While in Calcite both logical and physical operators are represented by classes that extend RelNode, in multi-stage + * Pinot plans we first optimize {@link RelNode RelNodes} at logical level in the broker and then we transform send + * these RelNodes to the server (using {@code org.apache.pinot.query.planner.plannode.PlanNode} as intermediate format). + * + * Servers then extract leaf nodes from the stage, trying to execute as much as possible in a leaf operator. In that + * leaf operator is where nodes are compiled into single-stage physical operators + * (aka {@link org.apache.pinot.core.operator.BaseOperator}), which include important information like whether indexes + * are being used or not. + * + * {@link PinotExplainedRelNode} is a way to represent these single-stage operators in Calcite. + * <b>They are not meant to be executed</b>. + * Instead, when physical explain is required, the broker ask for the final plan to each server. They return + * a PlanNode and the broker then converts these PlanNodes into a PinotExplainedRelNode. The logical plan is then + * modified to substitute the nodes that were converted into single-stage physical plans with the corresponding + * PinotExplainedRelNode. + */ +public class PinotExplainedRelNode extends AbstractRelNode { + + /** + * The name of this node, whose role is like a title in the explain plan. + */ + private final String _type; + private final Map<String, Plan.ExplainNode.AttributeValue> _attributes; + private final List<RelNode> _inputs; + private final DataSchema _dataSchema; + + public PinotExplainedRelNode(RelOptCluster cluster, RelTraitSet traitSet, String type, + Map<String, Plan.ExplainNode.AttributeValue> attributes, DataSchema dataSchema, RelNode input) { + this(cluster, traitSet, type, attributes, dataSchema, Lists.newArrayList(input)); + } + + public PinotExplainedRelNode(RelOptCluster cluster, RelTraitSet traitSet, String type, + Map<String, Plan.ExplainNode.AttributeValue> attributes, DataSchema dataSchema, List<? extends RelNode> inputs) { + super(cluster, traitSet); + _type = type; + _attributes = attributes; + _inputs = new ArrayList<>(inputs); + _dataSchema = dataSchema; + } + + @Override + protected RelDataType deriveRowType() { + int size = _dataSchema.size(); + List<RelDataTypeField> fields = new ArrayList<>(size); + RelDataTypeFactory typeFactory = getCluster().getTypeFactory(); + for (int i = 0; i < size; i++) { + String columnName = _dataSchema.getColumnName(i); + + DataSchema.ColumnDataType columnDataType = _dataSchema.getColumnDataType(i); + RelDataType type = columnDataType.toType(typeFactory); + + fields.add(new RelDataTypeFieldImpl(columnName, i, type)); + } + return new RelRecordType(StructKind.FULLY_QUALIFIED, fields, false); + } + + @Override + public List<RelNode> getInputs() { + return Collections.unmodifiableList(_inputs); + } + + @Override + public void replaceInput(int ordinalInParent, RelNode p) { + _inputs.set(ordinalInParent, p); + } + + @Override + public String getRelTypeName() { + return _type; + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + RelWriter relWriter = super.explainTerms(pw); + int inputSize = getInputs().size(); + if (inputSize == 1) { + relWriter.input("input", getInputs().get(0)); + } else if (inputSize > 1) { + for (int i = 0; i < inputSize; i++) { + relWriter.input("input#" + i, getInputs().get(i)); + } + } + for (Map.Entry<String, Plan.ExplainNode.AttributeValue> entry : _attributes.entrySet()) { + Plan.ExplainNode.AttributeValue value = entry.getValue(); + if (value.hasString()) { + relWriter.item(entry.getKey(), value.getString()); + } else if (value.hasLong()) { + relWriter.item(entry.getKey(), value.getLong()); + } else if (value.hasBool()) { + relWriter.item(entry.getKey(), value.getBool()); + } else if (value.hasJson()) { + relWriter.item(entry.getKey(), value.getJson()); + } else { + relWriter.item(entry.getKey(), "unknown value"); + } Review Comment: nit: might be better to use `switch (value.getValueCase())` here instead. ########## pinot-core/src/main/java/org/apache/pinot/core/plan/PinotExplainedRelNode.java: ########## @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.plan; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.AbstractRelNode; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeFieldImpl; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.rel.type.StructKind; +import org.apache.pinot.common.proto.Plan; +import org.apache.pinot.common.utils.DataSchema; + + +/** + * This class is a hackish way to adapt Pinot explain plan into Calcite explain plan. + * + * While in Calcite both logical and physical operators are represented by classes that extend RelNode, in multi-stage + * Pinot plans we first optimize {@link RelNode RelNodes} at logical level in the broker and then we transform send + * these RelNodes to the server (using {@code org.apache.pinot.query.planner.plannode.PlanNode} as intermediate format). + * + * Servers then extract leaf nodes from the stage, trying to execute as much as possible in a leaf operator. In that + * leaf operator is where nodes are compiled into single-stage physical operators + * (aka {@link org.apache.pinot.core.operator.BaseOperator}), which include important information like whether indexes + * are being used or not. + * + * {@link PinotExplainedRelNode} is a way to represent these single-stage operators in Calcite. + * <b>They are not meant to be executed</b>. + * Instead, when physical explain is required, the broker ask for the final plan to each server. They return + * a PlanNode and the broker then converts these PlanNodes into a PinotExplainedRelNode. The logical plan is then + * modified to substitute the nodes that were converted into single-stage physical plans with the corresponding + * PinotExplainedRelNode. + */ +public class PinotExplainedRelNode extends AbstractRelNode { + + /** + * The name of this node, whose role is like a title in the explain plan. + */ + private final String _type; + private final Map<String, Plan.ExplainNode.AttributeValue> _attributes; + private final List<RelNode> _inputs; + private final DataSchema _dataSchema; + + public PinotExplainedRelNode(RelOptCluster cluster, RelTraitSet traitSet, String type, Review Comment: Unused constructor? ########## pinot-core/src/main/java/org/apache/pinot/core/plan/PinotExplainedRelNode.java: ########## @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.plan; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.AbstractRelNode; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeFieldImpl; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.rel.type.StructKind; +import org.apache.pinot.common.proto.Plan; +import org.apache.pinot.common.utils.DataSchema; + + +/** + * This class is a hackish way to adapt Pinot explain plan into Calcite explain plan. + * + * While in Calcite both logical and physical operators are represented by classes that extend RelNode, in multi-stage + * Pinot plans we first optimize {@link RelNode RelNodes} at logical level in the broker and then we transform send + * these RelNodes to the server (using {@code org.apache.pinot.query.planner.plannode.PlanNode} as intermediate format). + * + * Servers then extract leaf nodes from the stage, trying to execute as much as possible in a leaf operator. In that + * leaf operator is where nodes are compiled into single-stage physical operators + * (aka {@link org.apache.pinot.core.operator.BaseOperator}), which include important information like whether indexes + * are being used or not. + * + * {@link PinotExplainedRelNode} is a way to represent these single-stage operators in Calcite. + * <b>They are not meant to be executed</b>. + * Instead, when physical explain is required, the broker ask for the final plan to each server. They return + * a PlanNode and the broker then converts these PlanNodes into a PinotExplainedRelNode. The logical plan is then + * modified to substitute the nodes that were converted into single-stage physical plans with the corresponding + * PinotExplainedRelNode. + */ +public class PinotExplainedRelNode extends AbstractRelNode { + + /** + * The name of this node, whose role is like a title in the explain plan. + */ + private final String _type; + private final Map<String, Plan.ExplainNode.AttributeValue> _attributes; + private final List<RelNode> _inputs; + private final DataSchema _dataSchema; + + public PinotExplainedRelNode(RelOptCluster cluster, RelTraitSet traitSet, String type, + Map<String, Plan.ExplainNode.AttributeValue> attributes, DataSchema dataSchema, RelNode input) { + this(cluster, traitSet, type, attributes, dataSchema, Lists.newArrayList(input)); + } + + public PinotExplainedRelNode(RelOptCluster cluster, RelTraitSet traitSet, String type, + Map<String, Plan.ExplainNode.AttributeValue> attributes, DataSchema dataSchema, List<? extends RelNode> inputs) { + super(cluster, traitSet); + _type = type; + _attributes = attributes; + _inputs = new ArrayList<>(inputs); + _dataSchema = dataSchema; + } + + @Override + protected RelDataType deriveRowType() { + int size = _dataSchema.size(); + List<RelDataTypeField> fields = new ArrayList<>(size); + RelDataTypeFactory typeFactory = getCluster().getTypeFactory(); + for (int i = 0; i < size; i++) { + String columnName = _dataSchema.getColumnName(i); + + DataSchema.ColumnDataType columnDataType = _dataSchema.getColumnDataType(i); + RelDataType type = columnDataType.toType(typeFactory); + + fields.add(new RelDataTypeFieldImpl(columnName, i, type)); + } + return new RelRecordType(StructKind.FULLY_QUALIFIED, fields, false); + } + + @Override + public List<RelNode> getInputs() { + return Collections.unmodifiableList(_inputs); + } + + @Override + public void replaceInput(int ordinalInParent, RelNode p) { + _inputs.set(ordinalInParent, p); + } + + @Override + public String getRelTypeName() { + return _type; + } Review Comment: There's a comment in `AbstractRelNode` that says that it is not recommended to override this method (default implementation uses the class name). Any particular reason we need this? ########## pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java: ########## @@ -85,7 +85,7 @@ public class QueryContext { private final int _offset; private final Map<String, String> _queryOptions; private final Map<ExpressionContext, ExpressionContext> _expressionOverrideHints; - private final boolean _explain; + private final ExplainMode _explain; Review Comment: Seems a little counter-intuitive to have an "explain mode" for actual queries as well - maybe we can use `null` here for `_explain` instead of having an enum value `NONE`? ########## pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextConverterUtils.java: ########## @@ -155,11 +157,23 @@ public static QueryContext getQueryContext(PinotQuery pinotQuery) { } } + ExplainMode explainMode; + if (!pinotQuery.isExplain()) { + explainMode = ExplainMode.NONE; + } else { + String useMultistageEngine = CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE; + if (pinotQuery.getQueryOptions() != null && pinotQuery.getQueryOptions().containsKey(useMultistageEngine)) { Review Comment: Shouldn't we check if the option value is `true` instead of just checking for the key's existence? We can use `QueryOptionsUtils::isUseMultistageEngine` here. ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java: ########## @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.util.List; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.util.TestUtils; +import org.intellij.lang.annotations.Language; +import org.testcontainers.shaded.org.apache.commons.io.FileUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class MultiStageEngineExplainIntegrationTest extends BaseClusterIntegrationTest { + private static final String SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema"; + private static final String DEFAULT_DATABASE_NAME = CommonConstants.DEFAULT_DATABASE; + private static final String DATABASE_NAME = "db1"; + private static final String TABLE_NAME_WITH_DATABASE = DATABASE_NAME + "." + DEFAULT_TABLE_NAME; + private String _tableName = DEFAULT_TABLE_NAME; + + @Override + protected String getSchemaFileName() { + return SCHEMA_FILE_NAME; + } + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServers(2); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + // Unpack the Avro files + List<File> avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + uploadSegments(getTableName(), _tarDir); + + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L); + } + + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + brokerConf.setProperty(CommonConstants.MultiStageQueryRunner.ASK_SERVERS_FOR_EXPLAIN_PLAN, "true"); + } + + @BeforeMethod + public void resetMultiStage() { + setUseMultiStageQueryEngine(true); + } + + @Test + public void simpleQuery() { + explain("SELECT 1 FROM mytable", + //@formatter:off + "Execution Plan\n" + + "PinotLogicalExchange(distribution=[broadcast])\n" Review Comment: Right now, these plans look like two different types of plans mashed together (which they essentially are, but might be nice to hide that for users). Do you think it makes sense to unify the casing used by the leaf stage nodes' explain terms to match that of Calcite? I know that we're currently using the same format as what is used for single-stage engine explain queries and we might not want to change that so maybe we could just do a simple blind conversion from `SCREAMING_SNAKE_CASE` to `CamelCase`. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java: ########## @@ -167,6 +173,26 @@ protected TransferableBlock updateEosBlock(TransferableBlock upstreamEos, StatMa return upstreamEos; } + @Override + public ExplainInfo getExplainInfo() { + return new ExplainInfo(getExplainName(), getExplainAttributes(), getChildrenExplainInfo()); + } + + protected List<ExplainInfo> getChildrenExplainInfo() { + return getChildOperators().stream() + .filter(Objects::nonNull) + .map(Operator::getExplainInfo) + .collect(Collectors.toList()); + } + + protected String getExplainName() { + return toExplainString(); + } + + protected Map<String, Plan.ExplainNode.AttributeValue> getExplainAttributes() { + return Collections.emptyMap(); + } Review Comment: These are added to `BaseOperator` as well, so why not put them in the parent `Operator` interface. FWICT, all implementations are either implementations of `BaseOperator` or `MultiStageOperator`? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotTable.java: ########## @@ -44,8 +43,13 @@ public PinotTable(Schema schema) { @Override public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) { - Preconditions.checkState(relDataTypeFactory instanceof TypeFactory); - TypeFactory typeFactory = (TypeFactory) relDataTypeFactory; + // TODO: Look for a better solution + TypeFactory typeFactory; + if (relDataTypeFactory instanceof TypeFactory) { + typeFactory = (TypeFactory) relDataTypeFactory; + } else { + typeFactory = TypeFactory.INSTANCE; + } Review Comment: Why was this change needed? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java: ########## @@ -172,7 +185,27 @@ public QueryPlannerResult explainQuery(String sqlQuery, SqlNodeAndOptions sqlNod SqlExplainLevel level = explain.getDetailLevel() == null ? SqlExplainLevel.DIGEST_ATTRIBUTES : explain.getDetailLevel(); Set<String> tableNames = RelToPlanNodeConverter.getTableNamesFromRelRoot(relRoot.rel); - return new QueryPlannerResult(null, PlannerUtils.explainPlan(relRoot.rel, format, level), tableNames); + if (!explain.withImplementation() || !askServers) { Review Comment: It seems a little strange that we have `EXPLAIN IMPLEMENTATION PLAN FOR...` which results in the `SqlPhysicalExplain` branch above but here we're checking for `SqlExplain::withImplementation` and the plans returned are different. Basically, I think it'll be super confusing to end users that `EXPLAIN IMPLEMENTATION PLAN FOR...` and `EXPLAIN PLAN WITH IMPLEMENTATION FOR...` return very different looking results. I see that in the original issue for `EXPLAIN IMPLEMENTATION PLAN FOR...` (https://github.com/apache/pinot/issues/10901), the proposal originally was to use `EXPLAIN PHYSICAL PLAN FOR...` syntax, although I'm not sure we want to go that route either because the plan's semantics don't exactly match Calcite's physical plan semantics? ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java: ########## @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.util.List; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.util.TestUtils; +import org.intellij.lang.annotations.Language; +import org.testcontainers.shaded.org.apache.commons.io.FileUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class MultiStageEngineExplainIntegrationTest extends BaseClusterIntegrationTest { + private static final String SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema"; + private static final String DEFAULT_DATABASE_NAME = CommonConstants.DEFAULT_DATABASE; + private static final String DATABASE_NAME = "db1"; + private static final String TABLE_NAME_WITH_DATABASE = DATABASE_NAME + "." + DEFAULT_TABLE_NAME; + private String _tableName = DEFAULT_TABLE_NAME; + + @Override + protected String getSchemaFileName() { + return SCHEMA_FILE_NAME; + } + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServers(2); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + // Unpack the Avro files + List<File> avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + uploadSegments(getTableName(), _tarDir); + + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L); + } + + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + brokerConf.setProperty(CommonConstants.MultiStageQueryRunner.ASK_SERVERS_FOR_EXPLAIN_PLAN, "true"); + } + + @BeforeMethod + public void resetMultiStage() { + setUseMultiStageQueryEngine(true); + } + + @Test + public void simpleQuery() { + explain("SELECT 1 FROM mytable", + //@formatter:off + "Execution Plan\n" + + "PinotLogicalExchange(distribution=[broadcast])\n" + + " LEAF_STAGE_COMBINE_OPERATOR\n" + + " STREAMING_INSTANCE_RESPONSE\n" + + " STREAMING_COMBINE_SELECT(repeatedOnSegments=[12])\n" + + " SELECT_STREAMING(totalDocs=[115545])\n" + + " TRANSFORM(expressions=[[\"'1'\"]])\n" + + " PROJECT(columns=[[]])\n" + + " DOC_ID_SET(maxDocs=[120000])\n" + + " FILTER_MATCH_ENTIRE_SEGMENT(numDocs=[115545])\n"); + //@formatter:on + } + + @Test + public void simpleQueryVerbose() { + explainVerbose("SELECT 1 FROM mytable", + //@formatter:off + "Execution Plan\n" + + "IntermediateCombine\n" + + " Alternative(servers=[1])\n" + + " PinotLogicalExchange(distribution=[broadcast])\n" + + " LEAF_STAGE_COMBINE_OPERATOR\n" + + " STREAMING_INSTANCE_RESPONSE\n" + + " STREAMING_COMBINE_SELECT\n" + + " SELECT_STREAMING(segment=[any], totalDocs=[any])\n" + + " TRANSFORM(expressions=[[\"'1'\"]])\n" + + " PROJECT(columns=[[]])\n" + + " DOC_ID_SET(maxDocs=[10000])\n" + + " FILTER_MATCH_ENTIRE_SEGMENT(numDocs=[any])\n" + + " SELECT_STREAMING(segment=[any], totalDocs=[any])\n" + + " TRANSFORM(expressions=[[\"'1'\"]])\n" + + " PROJECT(columns=[[]])\n" + + " DOC_ID_SET(maxDocs=[10000])\n" + + " FILTER_MATCH_ENTIRE_SEGMENT(numDocs=[any])\n" + + " SELECT_STREAMING(segment=[any], totalDocs=[any])\n" + + " TRANSFORM(expressions=[[\"'1'\"]])\n" + + " PROJECT(columns=[[]])\n" + + " DOC_ID_SET(maxDocs=[10000])\n" + + " FILTER_MATCH_ENTIRE_SEGMENT(numDocs=[any])\n" + + " SELECT_STREAMING(segment=[any], totalDocs=[any])\n" + + " TRANSFORM(expressions=[[\"'1'\"]])\n" + + " PROJECT(columns=[[]])\n" + + " DOC_ID_SET(maxDocs=[10000])\n" + + " FILTER_MATCH_ENTIRE_SEGMENT(numDocs=[any])\n" + + " SELECT_STREAMING(segment=[any], totalDocs=[any])\n" + + " TRANSFORM(expressions=[[\"'1'\"]])\n" + + " PROJECT(columns=[[]])\n" + + " DOC_ID_SET(maxDocs=[10000])\n" + + " FILTER_MATCH_ENTIRE_SEGMENT(numDocs=[any])\n" + + " SELECT_STREAMING(segment=[any], totalDocs=[any])\n" + + " TRANSFORM(expressions=[[\"'1'\"]])\n" + + " PROJECT(columns=[[]])\n" + + " DOC_ID_SET(maxDocs=[10000])\n" + + " FILTER_MATCH_ENTIRE_SEGMENT(numDocs=[any])\n" + + " Alternative(servers=[1])\n" + + " PinotLogicalExchange(distribution=[broadcast])\n" + + " LEAF_STAGE_COMBINE_OPERATOR\n" + + " STREAMING_INSTANCE_RESPONSE\n" + + " STREAMING_COMBINE_SELECT\n" + + " SELECT_STREAMING(segment=[any], totalDocs=[any])\n" + + " TRANSFORM(expressions=[[\"'1'\"]])\n" + + " PROJECT(columns=[[]])\n" + + " DOC_ID_SET(maxDocs=[10000])\n" + + " FILTER_MATCH_ENTIRE_SEGMENT(numDocs=[any])\n" + + " SELECT_STREAMING(segment=[any], totalDocs=[any])\n" + + " TRANSFORM(expressions=[[\"'1'\"]])\n" + + " PROJECT(columns=[[]])\n" + + " DOC_ID_SET(maxDocs=[10000])\n" + + " FILTER_MATCH_ENTIRE_SEGMENT(numDocs=[any])\n" + + " SELECT_STREAMING(segment=[any], totalDocs=[any])\n" + + " TRANSFORM(expressions=[[\"'1'\"]])\n" + + " PROJECT(columns=[[]])\n" + + " DOC_ID_SET(maxDocs=[10000])\n" + + " FILTER_MATCH_ENTIRE_SEGMENT(numDocs=[any])\n" + + " SELECT_STREAMING(segment=[any], totalDocs=[any])\n" + + " TRANSFORM(expressions=[[\"'1'\"]])\n" + + " PROJECT(columns=[[]])\n" + + " DOC_ID_SET(maxDocs=[10000])\n" + + " FILTER_MATCH_ENTIRE_SEGMENT(numDocs=[any])\n" + + " SELECT_STREAMING(segment=[any], totalDocs=[any])\n" + + " TRANSFORM(expressions=[[\"'1'\"]])\n" + + " PROJECT(columns=[[]])\n" + + " DOC_ID_SET(maxDocs=[10000])\n" + + " FILTER_MATCH_ENTIRE_SEGMENT(numDocs=[any])\n" + + " SELECT_STREAMING(segment=[any], totalDocs=[any])\n" + + " TRANSFORM(expressions=[[\"'1'\"]])\n" + + " PROJECT(columns=[[]])\n" + + " DOC_ID_SET(maxDocs=[10000])\n" + + " FILTER_MATCH_ENTIRE_SEGMENT(numDocs=[any])\n"); + //@formatter:on + } + + @Test + public void simpleQueryLogical() { + explainLogical("SELECT 1 FROM mytable", + //@formatter:off + "Execution Plan\n" + + "LogicalProject(EXPR$0=[1])\n" + + " LogicalTableScan(table=[[default, mytable]])\n"); + //@formatter:on + } + + @Override + protected String getTableName() { + return _tableName; + } Review Comment: Same as above, isn't this just using the default table name? ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java: ########## @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.util.List; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.util.TestUtils; +import org.intellij.lang.annotations.Language; +import org.testcontainers.shaded.org.apache.commons.io.FileUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class MultiStageEngineExplainIntegrationTest extends BaseClusterIntegrationTest { + private static final String SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema"; + private static final String DEFAULT_DATABASE_NAME = CommonConstants.DEFAULT_DATABASE; + private static final String DATABASE_NAME = "db1"; + private static final String TABLE_NAME_WITH_DATABASE = DATABASE_NAME + "." + DEFAULT_TABLE_NAME; Review Comment: Unused? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifier.java: ########## @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.explain; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.proto.Plan; +import org.apache.pinot.core.query.reduce.ExplainPlanDataTableReducer; +import org.apache.pinot.query.planner.plannode.AggregateNode; +import org.apache.pinot.query.planner.plannode.ExchangeNode; +import org.apache.pinot.query.planner.plannode.ExplainedNode; +import org.apache.pinot.query.planner.plannode.FilterNode; +import org.apache.pinot.query.planner.plannode.JoinNode; +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; +import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.SetOpNode; +import org.apache.pinot.query.planner.plannode.SortNode; +import org.apache.pinot.query.planner.plannode.TableScanNode; +import org.apache.pinot.query.planner.plannode.ValueNode; +import org.apache.pinot.query.planner.plannode.WindowNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ExplainNodeSimplifier { + private static final Logger LOGGER = LoggerFactory.getLogger(ExplainNodeSimplifier.class); + + private ExplainNodeSimplifier() { + } + + public static PlanNode simplifyNode(PlanNode plan1) { + Visitor planNodeMerger = new Visitor(); + return plan1.visit(planNodeMerger, null); + } + + private static class Visitor implements PlanNodeVisitor<PlanNode, Void> { + private static final String REPEAT_ATTRIBUTE_KEY = "repeatedOnSegments"; + + private PlanNode defaultNode(PlanNode node) { + List<PlanNode> inputs = node.getInputs(); + List<PlanNode> newInputs = simplifyChildren(inputs); + return inputs != newInputs ? node.withInputs(newInputs) : node; + } + + @Override + public PlanNode visitAggregate(AggregateNode node, Void context) { + return defaultNode(node); + } + + @Override + public PlanNode visitFilter(FilterNode node, Void context) { + return defaultNode(node); + } + + @Override + public PlanNode visitJoin(JoinNode node, Void context) { + return defaultNode(node); + } + + @Override + public PlanNode visitMailboxReceive(MailboxReceiveNode node, Void context) { + return defaultNode(node); + } + + @Override + public PlanNode visitMailboxSend(MailboxSendNode node, Void context) { + return defaultNode(node); + } + + @Override + public PlanNode visitProject(ProjectNode node, Void context) { + return defaultNode(node); + } + + @Override + public PlanNode visitSort(SortNode node, Void context) { + return defaultNode(node); + } + + @Override + public PlanNode visitTableScan(TableScanNode node, Void context) { + return defaultNode(node); + } + + @Override + public PlanNode visitValue(ValueNode node, Void context) { + return defaultNode(node); + } + + @Override + public PlanNode visitWindow(WindowNode node, Void context) { + return defaultNode(node); + } + + @Override + public PlanNode visitSetOp(SetOpNode node, Void context) { + return defaultNode(node); + } + + @Override + public PlanNode visitExchange(ExchangeNode node, Void context) { + return defaultNode(node); + } + + @Override + public PlanNode visitExplained(ExplainedNode node, Void context) { + if (!node.getType().contains(ExplainPlanDataTableReducer.COMBINE) || node.getInputs().size() <= 1) { + return defaultNode(node); + } + List<PlanNode> simplifiedChildren = simplifyChildren(node.getInputs()); + PlanNode child1 = simplifiedChildren.get(0); + + for (int i = 1; i < simplifiedChildren.size(); i++) { + PlanNode child2 = simplifiedChildren.get(i); + PlanNode merged = PlanNodeMerger.mergePlans(child1, child2, false); + if (merged == null) { + LOGGER.info("Found unmergeable inputs on node of type {}: {} and {}", node, child1, child2); + assert false : "Unmergeable inputs found"; Review Comment: Why would we want to fail here if and only if assertions are enabled? ########## pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java: ########## @@ -46,4 +50,25 @@ public final T nextBlock() { // Make it protected because we should always call nextBlock() protected abstract T getNextBlock(); + + @Override + public ExplainInfo getExplainInfo() { + ExplainAttributeBuilder attributeBuilder = new ExplainAttributeBuilder(); + explainAttributes(attributeBuilder); + return new ExplainInfo(getExplainName(), attributeBuilder.build(), getChildrenExplainInfo()); + } + + protected List<ExplainInfo> getChildrenExplainInfo() { + return getChildOperators().stream() + .filter(Objects::nonNull) + .map(Operator::getExplainInfo) + .collect(Collectors.toList()); + } + + protected String getExplainName() { Review Comment: Also, it looks like the vast majority of `toExplainString` are basically `getExplainName` + `explainAttributes` in string form. I think we might be able to clean up the interface to avoid this duplication in most operator implementations? Although this isn't super critical and can be done as a follow up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org