This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch multi_stage_query_engine in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 8c8380005b6d833ee9d20ad83487aac1439e0019 Author: Rong Rong <ro...@apache.org> AuthorDate: Thu Mar 24 21:13:51 2022 -0700 Add pinot-query-planner module (#8340) * add pinot-query-planner - fix calcite upgrade compilation issue - fix query compilation runtime after calcite 1.29 upgrade - linter * address diff comments and add more TODOs Co-authored-by: Rong Rong <ro...@startree.ai> --- .../pinot/common/config/provider/TableCache.java | 10 + pinot-query-planner/pom.xml | 92 ++++ .../apache/calcite/jdbc/CalciteSchemaBuilder.java | 52 +++ .../org/apache/pinot/query/QueryEnvironment.java | 181 ++++++++ .../apache/pinot/query/catalog/PinotCatalog.java | 122 +++++ .../org/apache/pinot/query/catalog/PinotTable.java | 61 +++ .../apache/pinot/query/context/PlannerContext.java | 40 ++ .../query/parser/CalciteExpressionParser.java | 502 +++++++++++++++++++++ .../pinot/query/parser/CalciteSqlParser.java | 148 ++++++ .../org/apache/pinot/query/parser/ParserUtils.java | 63 +++ .../apache/pinot/query/parser/QueryRewriter.java | 46 ++ .../apache/pinot/query/planner/LogicalPlanner.java | 63 +++ .../org/apache/pinot/query/planner/QueryPlan.java | 60 +++ .../pinot/query/planner/RelToStageConverter.java | 71 +++ .../apache/pinot/query/planner/StageMetadata.java | 85 ++++ .../apache/pinot/query/planner/StagePlanner.java | 126 ++++++ .../query/planner/nodes/AbstractStageNode.java | 49 ++ .../apache/pinot/query/planner/nodes/CalcNode.java | 40 ++ .../apache/pinot/query/planner/nodes/JoinNode.java | 86 ++++ .../query/planner/nodes/MailboxReceiveNode.java | 54 +++ .../pinot/query/planner/nodes/MailboxSendNode.java | 55 +++ .../pinot/query/planner/nodes/StageNode.java | 40 ++ .../pinot/query/planner/nodes/TableScanNode.java | 57 +++ .../partitioning/FieldSelectionKeySelector.java | 39 ++ .../query/planner/partitioning/KeySelector.java | 37 ++ .../apache/pinot/query/routing/WorkerInstance.java | 51 +++ .../apache/pinot/query/routing/WorkerManager.java | 95 ++++ .../query/rules/PinotExchangeNodeInsertRule.java | 82 ++++ .../pinot/query/rules/PinotQueryRuleSets.java | 91 ++++ .../org/apache/pinot/query/type/TypeFactory.java | 84 ++++ .../org/apache/pinot/query/type/TypeSystem.java | 30 ++ .../org/apache/pinot/query/validate/Validator.java | 36 ++ .../apache/pinot/query/QueryEnvironmentTest.java | 122 +++++ .../pinot/query/QueryEnvironmentTestUtils.java | 131 ++++++ pom.xml | 6 +- 35 files changed, 2903 insertions(+), 4 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java index 17fbf73cd3..005fd65615 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java @@ -227,6 +227,16 @@ public class TableCache implements PinotConfigProvider { } } + /** + * Return a map between lower-case table name and their canonicalized form. Key-value pair are only different in + * case-sensitive environment. + * + * @return the table name map. + */ + public Map<String, String> getTableNameMap() { + return _tableNameMap; + } + private void addTableConfigs(List<String> paths) { // Subscribe data changes before reading the data to avoid missing changes for (String path : paths) { diff --git a/pinot-query-planner/pom.xml b/pinot-query-planner/pom.xml new file mode 100644 index 0000000000..8d1af64a19 --- /dev/null +++ b/pinot-query-planner/pom.xml @@ -0,0 +1,92 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>pinot</artifactId> + <groupId>org.apache.pinot</groupId> + <version>0.10.0-SNAPSHOT</version> + </parent> + <artifactId>pinot-query-planner</artifactId> + <name>Pinot Query Planner</name> + <url>https://pinot.apache.org/</url> + + <properties> + <pinot.root>${basedir}/..</pinot.root> + </properties> + + <build> + <plugins> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <!-- Pinot dependencies --> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-common</artifactId> + </dependency> + + <!-- Calcite dependencies --> + <dependency> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-core</artifactId> + </dependency> + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>janino</artifactId> + <version>3.0.9</version> + </dependency> + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>commons-compiler</artifactId> + <version>3.0.9</version> + </dependency> + + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java b/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java new file mode 100644 index 0000000000..ce3d1c99f9 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.calcite.jdbc; + +import org.apache.calcite.schema.Schema; + + +/** + * This class is used to create a {@link CalciteSchema} with a given {@link Schema} as the root. + * + * <p>This class resides in calcite.jdbc namespace because there's no complex logic we have in terms of catalog-based + * schema construct. We instead create a {@link SimpleCalciteSchema} that's package protected. + */ +public class CalciteSchemaBuilder { + + private CalciteSchemaBuilder() { + // do not instantiate. + } + + /** + * Creates a {@link CalciteSchema} with a given {@link Schema} as the root. + * + * <p>Calcite creates two layer of abstraction, the {@link CalciteSchema} is use internally for planner and + * {@link Schema} is user-facing with overrides. In our case we don't have a complex internal wrapper extension + * so we only reuse the package protected {@link SimpleCalciteSchema}. + * + * <p>If there's need to extend this feature for planner functionalities we should create our own extension to the + * {@link CalciteSchema}. + * + * @param root schema to use as a root schema + * @return calcite schema with given schema as the root + */ + public static CalciteSchema asRootSchema(Schema root) { + return new SimpleCalciteSchema(null, root, ""); + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java new file mode 100644 index 0000000000..215c0051d7 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query; + +import java.util.Collection; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.hep.HepProgramBuilder; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.prepare.PlannerImpl; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql2rel.SqlToRelConverter; +import org.apache.calcite.sql2rel.StandardConvertletTable; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.pinot.query.context.PlannerContext; +import org.apache.pinot.query.parser.CalciteSqlParser; +import org.apache.pinot.query.planner.LogicalPlanner; +import org.apache.pinot.query.planner.QueryPlan; +import org.apache.pinot.query.planner.StagePlanner; +import org.apache.pinot.query.routing.WorkerManager; +import org.apache.pinot.query.rules.PinotQueryRuleSets; +import org.apache.pinot.query.type.TypeFactory; +import org.apache.pinot.query.validate.Validator; + + +/** + * The {@code QueryEnvironment} contains the main entrypoint for query planning. + * + * <p>It provide the higher level entry interface to convert a SQL string into a {@link QueryPlan}. + */ +public class QueryEnvironment { + // Calcite configurations + private final FrameworkConfig _config; + + // Calcite extension/plugins + private final CalciteSchema _rootSchema; + private final PlannerImpl _planner; + private final Prepare.CatalogReader _catalogReader; + private final RelDataTypeFactory _typeFactory; + private final RelOptPlanner _relOptPlanner; + private final SqlValidator _validator; + + // Pinot extensions + private final Collection<RelOptRule> _logicalRuleSet; + private final WorkerManager _workerManager; + + public QueryEnvironment(TypeFactory typeFactory, CalciteSchema rootSchema, WorkerManager workerManager) { + _typeFactory = typeFactory; + _rootSchema = rootSchema; + _workerManager = workerManager; + _config = Frameworks.newConfigBuilder().traitDefs().build(); + + // Planner is not thread-safe. must be reset() after each use. + _planner = new PlannerImpl(_config); + + // catalog + _catalogReader = new CalciteCatalogReader(_rootSchema, _rootSchema.path(null), _typeFactory, null); + _validator = new Validator(SqlStdOperatorTable.instance(), _catalogReader, _typeFactory); + + // optimizer rules + _logicalRuleSet = PinotQueryRuleSets.LOGICAL_OPT_RULES; + + // optimizer + HepProgramBuilder hepProgramBuilder = new HepProgramBuilder(); + for (RelOptRule relOptRule : _logicalRuleSet) { + hepProgramBuilder.addRuleInstance(relOptRule); + } + _relOptPlanner = new LogicalPlanner(hepProgramBuilder.build(), Contexts.EMPTY_CONTEXT); + } + + /** + * Plan a SQL query. + * + * <p>Noted that since Calcite's {@link org.apache.calcite.tools.Planner} is not threadsafe. + * Only one query query can be planned at a time. Afterwards planner needs to be reset in order to clear the + * state for the next planning. + * + * <p>In order for faster planning, we pre-constructed all the planner objects and use the plan-then-reset + * model. Thusn when using {@code QueryEnvironment#planQuery(String)}, caller should ensure that no concurrent + * plan execution occurs. + * + * TODO: follow benchmark and profile to measure whether it make sense for the latency-concurrency trade-off + * between reusing plannerImpl vs. create a new planner for each query. + * + * @param sqlQuery SQL query string. + * @return a dispatchable query plan + */ + public QueryPlan planQuery(String sqlQuery) { + PlannerContext plannerContext = new PlannerContext(); + try { + SqlNode parsed = parse(sqlQuery, plannerContext); + SqlNode validated = validate(parsed); + RelRoot relation = toRelation(validated, plannerContext); + RelNode optimized = optimize(relation, plannerContext); + return toDispatchablePlan(optimized, plannerContext); + } catch (Exception e) { + throw new RuntimeException("Error composing query plan for: " + sqlQuery, e); + } finally { + _planner.close(); + _planner.reset(); + } + } + + // -------------------------------------------------------------------------- + // steps + // -------------------------------------------------------------------------- + + protected SqlNode parse(String query, PlannerContext plannerContext) + throws Exception { + // 1. invoke CalciteSqlParser to parse out SqlNode; + return CalciteSqlParser.compile(query, plannerContext); + } + + protected SqlNode validate(SqlNode parsed) + throws Exception { + // 2. validator to validate. + SqlNode validated = _validator.validate(parsed); + if (null == validated || !validated.getKind().belongsTo(SqlKind.QUERY)) { + throw new IllegalArgumentException( + String.format("unsupported SQL query, cannot validate out a valid sql from:\n%s", parsed)); + } + return validated; + } + + protected RelRoot toRelation(SqlNode parsed, PlannerContext plannerContext) { + // 3. convert sqlNode to relNode. + RexBuilder rexBuilder = new RexBuilder(_typeFactory); + RelOptCluster cluster = RelOptCluster.create(_relOptPlanner, rexBuilder); + SqlToRelConverter sqlToRelConverter = + new SqlToRelConverter(_planner, _validator, _catalogReader, cluster, StandardConvertletTable.INSTANCE, + SqlToRelConverter.config()); + return sqlToRelConverter.convertQuery(parsed, false, true); + } + + protected RelNode optimize(RelRoot relRoot, PlannerContext plannerContext) { + // 4. optimize relNode + // TODO: add support for traits, cost factory. + try { + _relOptPlanner.setRoot(relRoot.rel); + return _relOptPlanner.findBestExp(); + } catch (Exception e) { + throw new UnsupportedOperationException( + "Cannot generate a valid execution plan for the given query: " + RelOptUtil.toString(relRoot.rel), e); + } + } + + protected QueryPlan toDispatchablePlan(RelNode relRoot, PlannerContext plannerContext) { + // 5. construct a dispatchable query plan. + StagePlanner queryStagePlanner = new StagePlanner(plannerContext, _workerManager); + return queryStagePlanner.makePlan(relRoot); + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java new file mode 100644 index 0000000000..34673a274b --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.catalog; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.SchemaVersion; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.Table; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + +import static java.util.Objects.requireNonNull; + + +/** + * Simple Catalog that only contains list of tables. Backed by {@link TableCache}. + * + * <p>Catalog is needed for utilizing Apache Calcite's validator, which requires a root schema to store the + * entire catalog. In Pinot, since we don't have nested sub-catalog concept, we just return a flat list of schemas. + */ +public class PinotCatalog implements Schema { + + private final TableCache _tableCache; + + /** + * PinotCatalog needs have access to the actual {@link TableCache} object because TableCache hosts the actual + * table available for query and processes table/segment metadata updates when cluster status changes. + */ + public PinotCatalog(TableCache tableCache) { + _tableCache = tableCache; + } + + /** + * Acquire a table by its name. + * @param name name of the table. + * @return table object used by calcite planner. + */ + @Override + public Table getTable(String name) { + String tableName = TableNameBuilder.extractRawTableName(name); + return new PinotTable(_tableCache.getSchema(tableName)); + } + + /** + * acquire a set of available table names. + * @return the set of table names at the time of query planning. + */ + @Override + public Set<String> getTableNames() { + return _tableCache.getTableNameMap().keySet(); + } + + @Override + public RelProtoDataType getType(String name) { + return null; + } + + @Override + public Set<String> getTypeNames() { + return Collections.emptySet(); + } + + @Override + public Collection<Function> getFunctions(String name) { + return Collections.emptyList(); + } + + @Override + public Set<String> getFunctionNames() { + return Collections.emptySet(); + } + + @Override + public Schema getSubSchema(String name) { + return null; + } + + @Override + public Set<String> getSubSchemaNames() { + return Collections.emptySet(); + } + + @Override + public Expression getExpression(@Nullable SchemaPlus parentSchema, String name) { + requireNonNull(parentSchema, "parentSchema"); + return Schemas.subSchemaExpression(parentSchema, name, getClass()); + } + + @Override + public boolean isMutable() { + return false; + } + + @Override + public Schema snapshot(SchemaVersion version) { + return this; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotTable.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotTable.java new file mode 100644 index 0000000000..23e6444e90 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotTable.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.catalog; + +import com.clearspring.analytics.util.Preconditions; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.pinot.query.type.TypeFactory; +import org.apache.pinot.spi.data.Schema; + + +/** + * Wrapper for pinot internal info for a table. + * + * <p>This construct is used to connect a Pinot table to Apache Calcite's relational planner by providing a + * {@link RelDataType} of the table to the planner. + */ +public class PinotTable extends AbstractTable implements ScannableTable { + private Schema _schema; + + public PinotTable(Schema schema) { + _schema = schema; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) { + Preconditions.checkState(relDataTypeFactory instanceof TypeFactory); + TypeFactory typeFactory = (TypeFactory) relDataTypeFactory; + return typeFactory.createRelDataTypeFromSchema(_schema); + } + + @Override + public boolean isRolledUp(String s) { + return false; + } + + @Override + public Enumerable<Object[]> scan(DataContext dataContext) { + return null; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java new file mode 100644 index 0000000000..1997d1f370 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.context; + +import java.util.Map; + + +/** + * PlannerContext is an object that holds all contextual information during planning phase. + * + * TODO: currently the planner context is not used since we don't support option or query rewrite. This construct is + * here as a placeholder for the parsed out options. + */ +public class PlannerContext { + private Map<String, String> _options; + + public void setOptions(Map<String, String> options) { + _options = options; + } + + public Map<String, String> getOptions() { + return _options; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteExpressionParser.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteExpressionParser.java new file mode 100644 index 0000000000..fc75efb1ae --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteExpressionParser.java @@ -0,0 +1,502 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.parser; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlDataTypeSpec; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.fun.SqlCase; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.request.Expression; +import org.apache.pinot.common.request.ExpressionType; +import org.apache.pinot.common.request.Function; +import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.sql.parsers.SqlCompilationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Calcite parser to convert SQL expressions into {@link Expression}. + * + * <p>This class is extracted from {@link org.apache.pinot.sql.parsers.CalciteSqlParser}. It only contains the + * {@link Expression} related info, this is used for ingestion and query rewrite. + */ +public class CalciteExpressionParser { + private static final Logger LOGGER = LoggerFactory.getLogger(CalciteExpressionParser.class); + + private CalciteExpressionParser() { + // do not instantiate. + } + + private static List<Expression> getAliasLeftExpressionsFromDistinctExpression(Function function) { + List<Expression> operands = function.getOperands(); + List<Expression> expressions = new ArrayList<>(operands.size()); + for (Expression operand : operands) { + if (isAsFunction(operand)) { + expressions.add(operand.getFunctionCall().getOperands().get(0)); + } else { + expressions.add(operand); + } + } + return expressions; + } + + public static boolean isAggregateExpression(Expression expression) { + Function functionCall = expression.getFunctionCall(); + if (functionCall != null) { + String operator = functionCall.getOperator(); + try { + AggregationFunctionType.getAggregationFunctionType(operator); + return true; + } catch (IllegalArgumentException e) { + } + if (functionCall.getOperandsSize() > 0) { + for (Expression operand : functionCall.getOperands()) { + if (isAggregateExpression(operand)) { + return true; + } + } + } + } + return false; + } + + public static boolean isAsFunction(Expression expression) { + return expression.getFunctionCall() != null && expression.getFunctionCall().getOperator().equalsIgnoreCase("AS"); + } + + /** + * Extract all the identifiers from given expressions. + * + * @param expressions + * @param excludeAs if true, ignores the right side identifier for AS function. + * @return all the identifier names. + */ + public static Set<String> extractIdentifiers(List<Expression> expressions, boolean excludeAs) { + Set<String> identifiers = new HashSet<>(); + for (Expression expression : expressions) { + if (expression.getIdentifier() != null) { + identifiers.add(expression.getIdentifier().getName()); + } else if (expression.getFunctionCall() != null) { + if (excludeAs && expression.getFunctionCall().getOperator().equalsIgnoreCase("AS")) { + identifiers.addAll( + extractIdentifiers(Arrays.asList(expression.getFunctionCall().getOperands().get(0)), true)); + continue; + } else { + identifiers.addAll(extractIdentifiers(expression.getFunctionCall().getOperands(), excludeAs)); + } + } + } + return identifiers; + } + + /** + * Compiles a String expression into {@link Expression}. + * + * @param expression String expression. + * @return {@link Expression} equivalent of the string. + * + * @throws SqlCompilationException if String is not a valid expression. + */ + public static Expression compileToExpression(String expression) { + SqlParser sqlParser = SqlParser.create(expression, ParserUtils.PARSER_CONFIG); + SqlNode sqlNode; + try { + sqlNode = sqlParser.parseExpression(); + } catch (SqlParseException e) { + throw new SqlCompilationException("Caught exception while parsing expression: " + expression, e); + } + return toExpression(sqlNode); + } + + private static List<Expression> convertDistinctSelectList(SqlNodeList selectList) { + List<Expression> selectExpr = new ArrayList<>(); + selectExpr.add(convertDistinctAndSelectListToFunctionExpression(selectList)); + return selectExpr; + } + + private static List<Expression> convertSelectList(SqlNodeList selectList) { + List<Expression> selectExpr = new ArrayList<>(); + + final Iterator<SqlNode> iterator = selectList.iterator(); + while (iterator.hasNext()) { + final SqlNode next = iterator.next(); + selectExpr.add(toExpression(next)); + } + + return selectExpr; + } + + private static List<Expression> convertOrderByList(SqlNodeList orderList) { + List<Expression> orderByExpr = new ArrayList<>(); + final Iterator<SqlNode> iterator = orderList.iterator(); + while (iterator.hasNext()) { + final SqlNode next = iterator.next(); + orderByExpr.add(convertOrderBy(next)); + } + return orderByExpr; + } + + private static Expression convertOrderBy(SqlNode node) { + final SqlKind kind = node.getKind(); + Expression expression; + switch (kind) { + case DESCENDING: + SqlBasicCall basicCall = (SqlBasicCall) node; + expression = RequestUtils.getFunctionExpression("DESC"); + expression.getFunctionCall().addToOperands(toExpression(basicCall.getOperandList().get(0))); + break; + case IDENTIFIER: + default: + expression = RequestUtils.getFunctionExpression("ASC"); + expression.getFunctionCall().addToOperands(toExpression(node)); + break; + } + return expression; + } + + /** + * DISTINCT is implemented as an aggregation function so need to take the select list items + * and convert them into a single function expression for handing over to execution engine + * either as a PinotQuery or BrokerRequest via conversion + * @param selectList select list items + * @return DISTINCT function expression + */ + private static Expression convertDistinctAndSelectListToFunctionExpression(SqlNodeList selectList) { + String functionName = AggregationFunctionType.DISTINCT.getName(); + Expression functionExpression = RequestUtils.getFunctionExpression(functionName); + for (SqlNode node : selectList) { + Expression columnExpression = toExpression(node); + if (columnExpression.getType() == ExpressionType.IDENTIFIER && columnExpression.getIdentifier().getName() + .equals("*")) { + throw new SqlCompilationException( + "Syntax error: Pinot currently does not support DISTINCT with *. Please specify each column name after " + + "DISTINCT keyword"); + } else if (columnExpression.getType() == ExpressionType.FUNCTION) { + Function functionCall = columnExpression.getFunctionCall(); + String function = functionCall.getOperator(); + if (AggregationFunctionType.isAggregationFunction(function)) { + throw new SqlCompilationException( + "Syntax error: Use of DISTINCT with aggregation functions is not supported"); + } + } + functionExpression.getFunctionCall().addToOperands(columnExpression); + } + return functionExpression; + } + + private static Expression toExpression(SqlNode node) { + LOGGER.debug("Current processing SqlNode: {}, node.getKind(): {}", node, node.getKind()); + switch (node.getKind()) { + case IDENTIFIER: + if (((SqlIdentifier) node).isStar()) { + return RequestUtils.getIdentifierExpression("*"); + } + if (((SqlIdentifier) node).isSimple()) { + return RequestUtils.getIdentifierExpression(((SqlIdentifier) node).getSimple()); + } + return RequestUtils.getIdentifierExpression(node.toString()); + case LITERAL: + return RequestUtils.getLiteralExpression((SqlLiteral) node); + case AS: + SqlBasicCall asFuncSqlNode = (SqlBasicCall) node; + List<SqlNode> operands = asFuncSqlNode.getOperandList(); + Expression leftExpr = toExpression(operands.get(0)); + SqlNode aliasSqlNode = operands.get(1); + String aliasName; + switch (aliasSqlNode.getKind()) { + case IDENTIFIER: + aliasName = ((SqlIdentifier) aliasSqlNode).getSimple(); + break; + case LITERAL: + aliasName = ((SqlLiteral) aliasSqlNode).toValue(); + break; + default: + throw new SqlCompilationException("Unsupported Alias sql node - " + aliasSqlNode); + } + Expression rightExpr = RequestUtils.getIdentifierExpression(aliasName); + // Just return left identifier if both sides are the same identifier. + if (leftExpr.isSetIdentifier() && rightExpr.isSetIdentifier()) { + if (leftExpr.getIdentifier().getName().equals(rightExpr.getIdentifier().getName())) { + return leftExpr; + } + } + final Expression asFuncExpr = RequestUtils.getFunctionExpression(SqlKind.AS.toString()); + asFuncExpr.getFunctionCall().addToOperands(leftExpr); + asFuncExpr.getFunctionCall().addToOperands(rightExpr); + return asFuncExpr; + case CASE: + // CASE WHEN Statement is model as a function with variable length parameters. + // Assume N is number of WHEN Statements, total number of parameters is (2 * N + 1). + // - N: Convert each WHEN Statement into a function Expression; + // - N: Convert each THEN Statement into an Expression; + // - 1: Convert ELSE Statement into an Expression. + SqlCase caseSqlNode = (SqlCase) node; + SqlNodeList whenOperands = caseSqlNode.getWhenOperands(); + SqlNodeList thenOperands = caseSqlNode.getThenOperands(); + SqlNode elseOperand = caseSqlNode.getElseOperand(); + Expression caseFuncExpr = RequestUtils.getFunctionExpression(SqlKind.CASE.name()); + for (SqlNode whenSqlNode : whenOperands.getList()) { + Expression whenExpression = toExpression(whenSqlNode); + if (isAggregateExpression(whenExpression)) { + throw new SqlCompilationException( + "Aggregation functions inside WHEN Clause is not supported - " + whenSqlNode); + } + caseFuncExpr.getFunctionCall().addToOperands(whenExpression); + } + for (SqlNode thenSqlNode : thenOperands.getList()) { + Expression thenExpression = toExpression(thenSqlNode); + if (isAggregateExpression(thenExpression)) { + throw new SqlCompilationException( + "Aggregation functions inside THEN Clause is not supported - " + thenSqlNode); + } + caseFuncExpr.getFunctionCall().addToOperands(thenExpression); + } + Expression elseExpression = toExpression(elseOperand); + if (isAggregateExpression(elseExpression)) { + throw new SqlCompilationException( + "Aggregation functions inside ELSE Clause is not supported - " + elseExpression); + } + caseFuncExpr.getFunctionCall().addToOperands(elseExpression); + return caseFuncExpr; + default: + if (node instanceof SqlDataTypeSpec) { + // This is to handle expression like: CAST(col AS INT) + return RequestUtils.getLiteralExpression(((SqlDataTypeSpec) node).getTypeName().getSimple()); + } else { + return compileFunctionExpression((SqlBasicCall) node); + } + } + } + + private static Expression compileFunctionExpression(SqlBasicCall functionNode) { + SqlKind functionKind = functionNode.getKind(); + String functionName; + switch (functionKind) { + case AND: + return compileAndExpression(functionNode); + case OR: + return compileOrExpression(functionNode); + case COUNT: + SqlLiteral functionQuantifier = functionNode.getFunctionQuantifier(); + if (functionQuantifier != null && functionQuantifier.toValue().equalsIgnoreCase("DISTINCT")) { + functionName = AggregationFunctionType.DISTINCTCOUNT.name(); + } else { + functionName = AggregationFunctionType.COUNT.name(); + } + break; + case OTHER: + case OTHER_FUNCTION: + case DOT: + functionName = functionNode.getOperator().getName().toUpperCase(); + if (functionName.equals("ITEM") || functionName.equals("DOT")) { + // Calcite parses path expression such as "data[0][1].a.b[0]" into a chain of ITEM and/or DOT + // functions. Collapse this chain into an identifier. + StringBuffer path = new StringBuffer(); + compilePathExpression(functionName, functionNode, path); + return RequestUtils.getIdentifierExpression(path.toString()); + } + break; + default: + functionName = functionKind.name(); + break; + } + // When there is no argument, set an empty list as the operands + List<SqlNode> childNodes = functionNode.getOperandList(); + List<Expression> operands = new ArrayList<>(childNodes.size()); + for (SqlNode childNode : childNodes) { + if (childNode instanceof SqlNodeList) { + for (SqlNode node : (SqlNodeList) childNode) { + operands.add(toExpression(node)); + } + } else { + operands.add(toExpression(childNode)); + } + } + validateFunction(functionName, operands); + Expression functionExpression = RequestUtils.getFunctionExpression(functionName); + functionExpression.getFunctionCall().setOperands(operands); + return functionExpression; + } + + /** + * Convert Calcite operator tree made up of ITEM and DOT functions to an identifier. For example, the operator tree + * shown below will be converted to IDENTIFIER "jsoncolumn.data[0][1].a.b[0]". + * + * ├── ITEM(jsoncolumn.data[0][1].a.b[0]) + * ├── LITERAL (0) + * └── DOT (jsoncolumn.daa[0][1].a.b) + * ├── IDENTIFIER (b) + * └── DOT (jsoncolumn.data[0][1].a) + * ├── IDENTIFIER (a) + * └── ITEM (jsoncolumn.data[0][1]) + * ├── LITERAL (1) + * └── ITEM (jsoncolumn.data[0]) + * ├── LITERAL (1) + * └── IDENTIFIER (jsoncolumn.data) + * + * @param functionName Name of the function ("DOT" or "ITEM") + * @param functionNode Root node of the DOT and/or ITEM operator function chain. + * @param path String representation of path represented by DOT and/or ITEM function chain. + */ + private static void compilePathExpression(String functionName, SqlBasicCall functionNode, StringBuffer path) { + List<SqlNode> operands = functionNode.getOperandList(); + + // Compile first operand of the function (either an identifier or another DOT and/or ITEM function). + SqlKind kind0 = operands.get(0).getKind(); + if (kind0 == SqlKind.IDENTIFIER) { + path.append(operands.get(0).toString()); + } else if (kind0 == SqlKind.DOT || kind0 == SqlKind.OTHER_FUNCTION) { + SqlBasicCall function0 = (SqlBasicCall) operands.get(0); + String name0 = function0.getOperator().getName(); + if (name0.equals("ITEM") || name0.equals("DOT")) { + compilePathExpression(name0, function0, path); + } else { + throw new SqlCompilationException("SELECT list item has bad path expression."); + } + } else { + throw new SqlCompilationException("SELECT list item has bad path expression."); + } + + // Compile second operand of the function (either an identifier or literal). + SqlKind kind1 = operands.get(1).getKind(); + if (kind1 == SqlKind.IDENTIFIER) { + path.append(".").append(((SqlIdentifier) operands.get(1)).getSimple()); + } else if (kind1 == SqlKind.LITERAL) { + path.append("[").append(((SqlLiteral) operands.get(1)).toValue()).append("]"); + } else { + throw new SqlCompilationException("SELECT list item has bad path expression."); + } + } + + public static String canonicalize(String functionName) { + return StringUtils.remove(functionName, '_').toLowerCase(); + } + + public static boolean isSameFunction(String function1, String function2) { + return canonicalize(function1).equals(canonicalize(function2)); + } + + private static void validateFunction(String functionName, List<Expression> operands) { + switch (canonicalize(functionName)) { + case "jsonextractscalar": + validateJsonExtractScalarFunction(operands); + break; + case "jsonextractkey": + validateJsonExtractKeyFunction(operands); + break; + default: + break; + } + } + + private static void validateJsonExtractScalarFunction(List<Expression> operands) { + int numOperands = operands.size(); + + // Check that there are exactly 3 or 4 arguments + if (numOperands != 3 && numOperands != 4) { + throw new SqlCompilationException( + "Expect 3 or 4 arguments for transform function: jsonExtractScalar(jsonFieldName, 'jsonPath', " + + "'resultsType', ['defaultValue'])"); + } + if (!operands.get(1).isSetLiteral() || !operands.get(2).isSetLiteral() || (numOperands == 4 && !operands.get(3) + .isSetLiteral())) { + throw new SqlCompilationException( + "Expect the 2nd/3rd/4th argument of transform function: jsonExtractScalar(jsonFieldName, 'jsonPath'," + + " 'resultsType', ['defaultValue']) to be a single-quoted literal value."); + } + } + + private static void validateJsonExtractKeyFunction(List<Expression> operands) { + // Check that there are exactly 2 arguments + if (operands.size() != 2) { + throw new SqlCompilationException( + "Expect 2 arguments are required for transform function: jsonExtractKey(jsonFieldName, 'jsonPath')"); + } + if (!operands.get(1).isSetLiteral()) { + throw new SqlCompilationException( + "Expect the 2nd argument for transform function: jsonExtractKey(jsonFieldName, 'jsonPath') to be a " + + "single-quoted literal value."); + } + } + + /** + * Helper method to flatten the operands for the AND expression. + */ + private static Expression compileAndExpression(SqlBasicCall andNode) { + List<Expression> operands = new ArrayList<>(); + for (SqlNode childNode : andNode.getOperandList()) { + if (childNode.getKind() == SqlKind.AND) { + Expression childAndExpression = compileAndExpression((SqlBasicCall) childNode); + operands.addAll(childAndExpression.getFunctionCall().getOperands()); + } else { + operands.add(toExpression(childNode)); + } + } + Expression andExpression = RequestUtils.getFunctionExpression(SqlKind.AND.name()); + andExpression.getFunctionCall().setOperands(operands); + return andExpression; + } + + /** + * Helper method to flatten the operands for the OR expression. + */ + private static Expression compileOrExpression(SqlBasicCall orNode) { + List<Expression> operands = new ArrayList<>(); + for (SqlNode childNode : orNode.getOperandList()) { + if (childNode.getKind() == SqlKind.OR) { + Expression childAndExpression = compileOrExpression((SqlBasicCall) childNode); + operands.addAll(childAndExpression.getFunctionCall().getOperands()); + } else { + operands.add(toExpression(childNode)); + } + } + Expression andExpression = RequestUtils.getFunctionExpression(SqlKind.OR.name()); + andExpression.getFunctionCall().setOperands(operands); + return andExpression; + } + + public static boolean isLiteralOnlyExpression(Expression e) { + if (e.getType() == ExpressionType.LITERAL) { + return true; + } + if (e.getType() == ExpressionType.FUNCTION) { + Function functionCall = e.getFunctionCall(); + if (functionCall.getOperator().equalsIgnoreCase(SqlKind.AS.toString())) { + return isLiteralOnlyExpression(functionCall.getOperands().get(0)); + } + return false; + } + return false; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteSqlParser.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteSqlParser.java new file mode 100644 index 0000000000..d67896f9b9 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteSqlParser.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.parser; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOrderBy; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.pinot.query.context.PlannerContext; +import org.apache.pinot.sql.parsers.SqlCompilationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This class provide API to parse a SQL string into Pinot query {@link SqlNode}. + * + * <p>This class is extracted from {@link org.apache.pinot.sql.parsers.CalciteSqlParser}. It contains the logic + * to parsed SQL into {@link SqlNode} and use {@link QueryRewriter} to rewrite the query with Pinot specific + * contextual info. + */ +public class CalciteSqlParser { + private static final Logger LOGGER = LoggerFactory.getLogger(CalciteSqlParser.class); + + private CalciteSqlParser() { + // do not instantiate. + } + + /** + * entrypoint for Sql Parser. + */ + public static SqlNode compile(String sql, PlannerContext plannerContext) + throws SqlCompilationException { + // Extract OPTION statements from sql as Calcite Parser doesn't parse it. + // TODO: use parser syntax extension instead. + Map<String, String> options = parseOptions(extractOptionsFromSql(sql)); + plannerContext.setOptions(options); + if (!options.isEmpty()) { + sql = removeOptionsFromSql(sql); + } + // Compile Sql without OPTION statements. + SqlNode parsed = parse(sql); + + // query rewrite. + return QueryRewriter.rewrite(parsed, plannerContext); + } + + // ========================================================================== + // Static utils to parse the SQL. + // ========================================================================== + + private static Map<String, String> parseOptions(List<String> optionsStatements) { + if (optionsStatements.isEmpty()) { + return Collections.emptyMap(); + } + Map<String, String> options = new HashMap<>(); + for (String optionsStatement : optionsStatements) { + for (String option : optionsStatement.split(",")) { + final String[] splits = option.split("="); + if (splits.length != 2) { + throw new SqlCompilationException("OPTION statement requires two parts separated by '='"); + } + options.put(splits[0].trim(), splits[1].trim()); + } + } + return options; + } + + private static SqlNode parse(String sql) { + SqlParser sqlParser = SqlParser.create(sql, ParserUtils.PARSER_CONFIG); + SqlNode sqlNode; + try { + sqlNode = sqlParser.parseQuery(); + } catch (SqlParseException e) { + throw new SqlCompilationException("Caught exception while parsing query: " + sql, e); + } + + // This is a special rewrite, + // TODO: move it to planner later. + SqlSelect selectNode; + if (sqlNode instanceof SqlOrderBy) { + // Store order-by info into the select sql node + SqlOrderBy orderByNode = (SqlOrderBy) sqlNode; + selectNode = (SqlSelect) orderByNode.query; + selectNode.setOrderBy(orderByNode.orderList); + selectNode.setFetch(orderByNode.fetch); + selectNode.setOffset(orderByNode.offset); + } else { + selectNode = (SqlSelect) sqlNode; + } + return selectNode; + } + + private static List<String> extractOptionsFromSql(String sql) { + List<String> results = new ArrayList<>(); + Matcher matcher = ParserUtils.OPTIONS_REGEX_PATTEN.matcher(sql); + while (matcher.find()) { + results.add(matcher.group(1)); + } + return results; + } + + private static String removeOptionsFromSql(String sql) { + Matcher matcher = ParserUtils.OPTIONS_REGEX_PATTEN.matcher(sql); + return matcher.replaceAll(""); + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/ParserUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/ParserUtils.java new file mode 100644 index 0000000000..5422382509 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/ParserUtils.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.parser; + +import java.util.regex.Pattern; +import org.apache.calcite.config.Lex; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.parser.babel.SqlBabelParserImpl; +import org.apache.calcite.sql.validate.SqlConformanceEnum; + + +/** + * Utility provided to Calcite parser. + * + * <p>This class is extracted from {@link org.apache.pinot.sql.parsers.CalciteSqlParser} for its static constructs. + */ +final class ParserUtils { + /** Lexical policy similar to MySQL with ANSI_QUOTES option enabled. (To be + * precise: MySQL on Windows; MySQL on Linux uses case-sensitive matching, + * like the Linux file system.) The case of identifiers is preserved whether + * or not they quoted; after which, identifiers are matched + * case-insensitively. Double quotes allow identifiers to contain + * non-alphanumeric characters. */ + static final Lex PINOT_LEX = Lex.MYSQL_ANSI; + + // BABEL is a very liberal conformance value that allows anything supported by any dialect + static final SqlParser.Config PARSER_CONFIG = + SqlParser.configBuilder().setLex(PINOT_LEX).setConformance(SqlConformanceEnum.BABEL) + .setParserFactory(SqlBabelParserImpl.FACTORY).build(); + + // TODO: move this to use parser syntax extension. + // To Keep the backward compatibility with 'OPTION' Functionality in PQL, which is used to + // provide more hints for query processing. + // + // PQL syntax is: `OPTION (<key> = <value>)` + // + // Multiple OPTIONs is also supported by: + // either + // `OPTION (<k1> = <v1>, <k2> = <v2>, <k3> = <v3>)` + // or + // `OPTION (<k1> = <v1>) OPTION (<k2> = <v2>) OPTION (<k3> = <v3>)` + static final Pattern OPTIONS_REGEX_PATTEN = Pattern.compile("option\\s*\\(([^\\)]+)\\)", Pattern.CASE_INSENSITIVE); + + private ParserUtils() { + // do not instantiate. + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/QueryRewriter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/QueryRewriter.java new file mode 100644 index 0000000000..8cb0060416 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/QueryRewriter.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.parser; + +import org.apache.calcite.sql.SqlNode; +import org.apache.pinot.query.context.PlannerContext; + +/** + * Rewrites query based on user option as well as built-in rules. + */ +public class QueryRewriter { + + private QueryRewriter() { + // do not instantiate. + } + + /** + * Entrypoint to execute the query rewrite. + * + * It should be functionally identical to running {@link org.apache.pinot.sql.parsers.rewriter.QueryRewriter}. + * But it operates on a {@link SqlNode} tree instead of a flat pinot query object. + * + * @param sqlNodeRoot root of the sqlNode tree + * @param plannerContext planner context + * @return rewritten sqlNode. + */ + public static SqlNode rewrite(SqlNode sqlNodeRoot, PlannerContext plannerContext) { + return sqlNodeRoot; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/LogicalPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/LogicalPlanner.java new file mode 100644 index 0000000000..9844916490 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/LogicalPlanner.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner; + +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.plan.Context; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.hep.HepPlanner; +import org.apache.calcite.plan.hep.HepProgram; + + +/** + * The {@code LogicalPlanner} is an extended implementation of the Calcite's {@link HepPlanner}. + */ +public class LogicalPlanner extends HepPlanner { + + private List<RelTraitDef> _traitDefs; + + public LogicalPlanner(HepProgram program, Context context) { + super(program, context); + _traitDefs = new ArrayList(); + } + + @Override + public boolean addRelTraitDef(RelTraitDef relTraitDef) { + return !_traitDefs.contains(relTraitDef) && _traitDefs.add(relTraitDef); + } + + @Override + public List<RelTraitDef> getRelTraitDefs() { + return _traitDefs; + } + + @Override + public RelTraitSet emptyTraitSet() { + RelTraitSet traitSet = super.emptyTraitSet(); + for (RelTraitDef traitDef : _traitDefs) { + if (traitDef.multiple()) { + // not supported + } + traitSet = traitSet.plus(traitDef.getDefault()); + } + return traitSet; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java new file mode 100644 index 0000000000..bdaba4ac70 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner; + +import java.util.Map; +import org.apache.pinot.query.planner.nodes.StageNode; + + +/** + * The {@code QueryPlan} is the dispatchable query execution plan from the result of {@link LogicalPlanner}. + * + * <p>QueryPlan should contain the necessary stage boundary information and the cross exchange information + * for: + * <ul> + * <li>dispatch individual stages to executor.</li> + * <li>instruct stage executor to establish connection channels to other stages.</li> + * <li>encode data blocks for transfer between stages based on partitioning scheme.</li> + * </ul> + */ +public class QueryPlan { + private Map<String, StageNode> _queryStageMap; + private Map<String, StageMetadata> _stageMetadataMap; + + public QueryPlan(Map<String, StageNode> queryStageMap, Map<String, StageMetadata> stageMetadataMap) { + _queryStageMap = queryStageMap; + _stageMetadataMap = stageMetadataMap; + } + + /** + * Get the map between stageID and the stage plan root node. + * @return stage plan map. + */ + public Map<String, StageNode> getQueryStageMap() { + return _queryStageMap; + } + + /** + * Get the stage metadata information. + * @return stage metadata info. + */ + public Map<String, StageMetadata> getStageMetadataMap() { + return _stageMetadataMap; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java new file mode 100644 index 0000000000..d167521f20 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalCalc; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.pinot.query.planner.nodes.CalcNode; +import org.apache.pinot.query.planner.nodes.JoinNode; +import org.apache.pinot.query.planner.nodes.StageNode; +import org.apache.pinot.query.planner.nodes.TableScanNode; + + +/** + * The {@code StageNodeConverter} converts a logical {@link RelNode} to a {@link StageNode}. + */ +public final class RelToStageConverter { + + private RelToStageConverter() { + // do not instantiate. + } + + /** + * convert a normal relation node into stage node with just the expression piece. + * + * TODO: we should convert this to a more structured pattern once we determine the serialization format used. + * + * @param node relational node + * @return stage node. + */ + public static StageNode toStageNode(RelNode node, String currentStageId) { + if (node instanceof LogicalCalc) { + return convertLogicalCal((LogicalCalc) node, currentStageId); + } else if (node instanceof LogicalTableScan) { + return convertLogicalTableScan((LogicalTableScan) node, currentStageId); + } else if (node instanceof LogicalJoin) { + return convertLogicalJoin((LogicalJoin) node, currentStageId); + } else { + throw new UnsupportedOperationException("Unsupported logical plan node: " + node); + } + } + + private static StageNode convertLogicalTableScan(LogicalTableScan node, String currentStageId) { + return new TableScanNode(node, currentStageId); + } + + private static StageNode convertLogicalCal(LogicalCalc node, String currentStageId) { + return new CalcNode(node, currentStageId); + } + + private static StageNode convertLogicalJoin(LogicalJoin node, String currentStageId) { + return new JoinNode(node, currentStageId); + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java new file mode 100644 index 0000000000..91eb5b77ca --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.query.planner.nodes.StageNode; +import org.apache.pinot.query.planner.nodes.TableScanNode; + + +/** + * The {@code StageMetadata} info contains the information for dispatching a particular stage. + * + * <p>It contains information aboute: + * <ul> + * <li>the tables it is suppose to scan for</li> + * <li>the underlying segments a stage requires to execute upon.</li> + * <li>the server instances to which this stage should be execute on</li> + * </ul> + */ +public class StageMetadata implements Serializable { + private List<String> _scannedTables; + + // used for assigning server/worker nodes. + private List<ServerInstance> _serverInstances; + + // used for table scan stage. + private Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap; + + public StageMetadata() { + _scannedTables = new ArrayList<>(); + _serverInstances = new ArrayList<>(); + _serverInstanceToSegmentsMap = new HashMap<>(); + } + + public void attach(StageNode stageNode) { + if (stageNode instanceof TableScanNode) { + _scannedTables.add(((TableScanNode) stageNode).getTableName().get(0)); + } + } + + public List<String> getScannedTables() { + return _scannedTables; + } + + // ----------------------------------------------- + // attached physical plan context. + // ----------------------------------------------- + + public Map<ServerInstance, List<String>> getServerInstanceToSegmentsMap() { + return _serverInstanceToSegmentsMap; + } + + public void setServerInstanceToSegmentsMap(Map<ServerInstance, List<String>> serverInstanceToSegmentsMap) { + _serverInstanceToSegmentsMap = serverInstanceToSegmentsMap; + } + + public List<ServerInstance> getServerInstances() { + return _serverInstances; + } + + public void setServerInstances(List<ServerInstance> serverInstances) { + _serverInstances = serverInstances; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java new file mode 100644 index 0000000000..175e77f6ef --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.logical.LogicalExchange; +import org.apache.pinot.query.context.PlannerContext; +import org.apache.pinot.query.planner.nodes.MailboxReceiveNode; +import org.apache.pinot.query.planner.nodes.MailboxSendNode; +import org.apache.pinot.query.planner.nodes.StageNode; +import org.apache.pinot.query.routing.WorkerManager; + + +/** + * QueryPlanMaker walks top-down from {@link RelRoot} and construct a forest of trees with {@link StageNode}. + * + * This class is non-threadsafe. Do not reuse the stage planner for multiple query plans. + */ +public class StagePlanner { + private final PlannerContext _plannerContext; + private final WorkerManager _workerManager; + + private Map<String, StageNode> _queryStageMap; + private Map<String, StageMetadata> _stageMetadataMap; + private int _stageIdCounter; + + public StagePlanner(PlannerContext plannerContext, WorkerManager workerManager) { + _plannerContext = plannerContext; + _workerManager = workerManager; + } + + /** + * Construct the dispatchable plan from relational logical plan. + * + * @param relRoot relational plan root. + * @return dispatchable plan. + */ + public QueryPlan makePlan(RelNode relRoot) { + // clear the state + _queryStageMap = new HashMap<>(); + _stageMetadataMap = new HashMap<>(); + _stageIdCounter = 0; + + // walk the plan and create stages. + StageNode globalStageRoot = walkRelPlan(relRoot, getNewStageId()); + + // global root needs to send results back to the ROOT, a.k.a. the client response node. + // the last stage is always a broadcast-gather. + StageNode globalReceiverNode = + new MailboxReceiveNode("ROOT", globalStageRoot.getStageId(), RelDistribution.Type.BROADCAST_DISTRIBUTED); + StageNode globalSenderNode = new MailboxSendNode(globalStageRoot, globalReceiverNode.getStageId(), + RelDistribution.Type.BROADCAST_DISTRIBUTED); + _queryStageMap.put(globalSenderNode.getStageId(), globalSenderNode); + StageMetadata stageMetadata = _stageMetadataMap.get(globalSenderNode.getStageId()); + stageMetadata.attach(globalSenderNode); + + _queryStageMap.put(globalReceiverNode.getStageId(), globalReceiverNode); + StageMetadata globalReceivingStageMetadata = new StageMetadata(); + globalReceivingStageMetadata.attach(globalReceiverNode); + _stageMetadataMap.put(globalReceiverNode.getStageId(), globalReceivingStageMetadata); + + // assign workers to each stage. + for (Map.Entry<String, StageMetadata> e : _stageMetadataMap.entrySet()) { + _workerManager.assignWorkerToStage(e.getKey(), e.getValue()); + } + + return new QueryPlan(_queryStageMap, _stageMetadataMap); + } + + // non-threadsafe + private StageNode walkRelPlan(RelNode node, String currentStageId) { + if (isExchangeNode(node)) { + // 1. exchangeNode always have only one input, get its input converted as a new stage root. + StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId()); + RelDistribution.Type exchangeType = ((LogicalExchange) node).distribution.getType(); + + // 2. make an exchange sender and receiver node pair + StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getStageId(), exchangeType); + StageNode mailboxSender = new MailboxSendNode(nextStageRoot, mailboxReceiver.getStageId(), exchangeType); + + // 3. put the sender side as a completed stage. + _queryStageMap.put(mailboxSender.getStageId(), mailboxSender); + + // 4. return the receiver (this is considered as a "virtual table scan" node for its parent. + return mailboxReceiver; + } else { + StageNode stageNode = RelToStageConverter.toStageNode(node, currentStageId); + List<RelNode> inputs = node.getInputs(); + for (RelNode input : inputs) { + stageNode.addInput(walkRelPlan(input, currentStageId)); + } + StageMetadata stageMetadata = _stageMetadataMap.computeIfAbsent(currentStageId, (id) -> new StageMetadata()); + stageMetadata.attach(stageNode); + return stageNode; + } + } + + private boolean isExchangeNode(RelNode node) { + return (node instanceof LogicalExchange); + } + + private String getNewStageId() { + return String.valueOf(_stageIdCounter++); + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java new file mode 100644 index 0000000000..71701df762 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.nodes; + +import java.util.ArrayList; +import java.util.List; + + +public abstract class AbstractStageNode implements StageNode { + + protected final String _stageId; + protected final List<StageNode> _inputs; + + public AbstractStageNode(String stageId) { + _stageId = stageId; + _inputs = new ArrayList<>(); + } + + @Override + public List<StageNode> getInputs() { + return _inputs; + } + + @Override + public void addInput(StageNode stageNode) { + _inputs.add(stageNode); + } + + @Override + public String getStageId() { + return _stageId; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java new file mode 100644 index 0000000000..4b4ca9165e --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.nodes; + +import org.apache.calcite.rel.logical.LogicalCalc; + + +public class CalcNode extends AbstractStageNode { + private final String _expression; + + public CalcNode(LogicalCalc node, String currentStageId) { + super(currentStageId); + _expression = toExpression(node); + } + + public String getExpression() { + return _expression; + } + + private String toExpression(LogicalCalc node) { + // TODO: make it real. + return node.getDigest(); + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java new file mode 100644 index 0000000000..520af693d6 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.nodes; + +import com.google.common.base.Preconditions; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.sql.SqlKind; +import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; + + +public class JoinNode extends AbstractStageNode { + private final JoinRelType _joinType; + private final int _leftOperandIndex; + private final int _rightOperandIndex; + private final FieldSelectionKeySelector _leftFieldSelectionKeySelector; + private final FieldSelectionKeySelector _rightFieldSelectionKeySelector; + + private transient final RelDataType _leftRowType; + private transient final RelDataType _rightRowType; + + public JoinNode(LogicalJoin node, String currentStageId) { + super(currentStageId); + _joinType = node.getJoinType(); + RexCall joinCondition = (RexCall) node.getCondition(); + Preconditions.checkState( + joinCondition.getOperator().getKind().equals(SqlKind.EQUALS) && joinCondition.getOperands().size() == 2, + "only equality JOIN is supported"); + Preconditions.checkState(joinCondition.getOperands().get(0) instanceof RexInputRef, "only reference supported"); + Preconditions.checkState(joinCondition.getOperands().get(1) instanceof RexInputRef, "only reference supported"); + _leftRowType = node.getLeft().getRowType(); + _rightRowType = node.getRight().getRowType(); + _leftOperandIndex = ((RexInputRef) joinCondition.getOperands().get(0)).getIndex(); + _rightOperandIndex = ((RexInputRef) joinCondition.getOperands().get(1)).getIndex(); + _leftFieldSelectionKeySelector = new FieldSelectionKeySelector(_leftOperandIndex); + _rightFieldSelectionKeySelector = + new FieldSelectionKeySelector(_rightOperandIndex - _leftRowType.getFieldNames().size()); + } + + public JoinRelType getJoinType() { + return _joinType; + } + + public RelDataType getLeftRowType() { + return _leftRowType; + } + + public RelDataType getRightRowType() { + return _rightRowType; + } + + public int getLeftOperandIndex() { + return _leftOperandIndex; + } + + public int getRightOperandIndex() { + return _rightOperandIndex; + } + + public FieldSelectionKeySelector getLeftJoinKeySelector() { + return _leftFieldSelectionKeySelector; + } + + public FieldSelectionKeySelector getRightJoinKeySelector() { + return _rightFieldSelectionKeySelector; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java new file mode 100644 index 0000000000..947d449383 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.nodes; + +import java.util.Collections; +import java.util.List; +import org.apache.calcite.rel.RelDistribution; + + +public class MailboxReceiveNode extends AbstractStageNode { + + private final String _senderStageId; + private final RelDistribution.Type _exchangeType; + + public MailboxReceiveNode(String stageId, String senderStageId, RelDistribution.Type exchangeType) { + super(stageId); + _senderStageId = senderStageId; + _exchangeType = exchangeType; + } + + @Override + public List<StageNode> getInputs() { + return Collections.emptyList(); + } + + @Override + public void addInput(StageNode stageNode) { + throw new UnsupportedOperationException("no input should be added to mailbox receive."); + } + + public String getSenderStageId() { + return _senderStageId; + } + + public RelDistribution.Type getExchangeType() { + return _exchangeType; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java new file mode 100644 index 0000000000..6db9578f35 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.nodes; + +import java.util.Collections; +import java.util.List; +import org.apache.calcite.rel.RelDistribution; + + +public class MailboxSendNode extends AbstractStageNode { + private final StageNode _stageRoot; + private final String _receiverStageId; + private final RelDistribution.Type _exchangeType; + + public MailboxSendNode(StageNode stageRoot, String receiverStageId, RelDistribution.Type exchangeType) { + super(stageRoot.getStageId()); + _stageRoot = stageRoot; + _receiverStageId = receiverStageId; + _exchangeType = exchangeType; + } + + @Override + public List<StageNode> getInputs() { + return Collections.singletonList(_stageRoot); + } + + @Override + public void addInput(StageNode queryStageRoot) { + throw new UnsupportedOperationException("mailbox cannot be changed!"); + } + + public String getReceiverStageId() { + return _receiverStageId; + } + + public RelDistribution.Type getExchangeType() { + return _exchangeType; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/StageNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/StageNode.java new file mode 100644 index 0000000000..8fcbb5e01a --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/StageNode.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.nodes; + +import java.io.Serializable; +import java.util.List; + + +/** + * Stage Node is a serializable version of the {@link org.apache.calcite.rel.RelNode}. + * + * TODO: stage node currently uses java.io.Serializable as its serialization format. + * We should experiment with other type of serialization format for better performance. + * Essentially what we need is a way to exclude the planner context from the RelNode but only keeps the + * constructed relational content because we will no longer revisit the planner after stage is created. + */ +public interface StageNode extends Serializable { + + List<StageNode> getInputs(); + + void addInput(StageNode stageNode); + + String getStageId(); +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java new file mode 100644 index 0000000000..0bb6e0f704 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.nodes; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rel.type.RelDataTypeField; + + +public class TableScanNode extends AbstractStageNode { + private final List<String> _tableName; + private final List<String> _tableScanColumns; + + public TableScanNode(LogicalTableScan tableScan, String stageId) { + super(stageId); + _tableName = tableScan.getTable().getQualifiedName(); + // TODO: optimize this, table field is not directly usable as name. + _tableScanColumns = + tableScan.getRowType().getFieldList().stream().map(RelDataTypeField::getName).collect(Collectors.toList()); + } + + @Override + public List<StageNode> getInputs() { + return Collections.emptyList(); + } + + @Override + public void addInput(StageNode queryStageRoot) { + throw new UnsupportedOperationException("TableScanNode cannot add input as it is a leaf node"); + } + + public List<String> getTableName() { + return _tableName; + } + + public List<String> getTableScanColumns() { + return _tableScanColumns; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java new file mode 100644 index 0000000000..9e0c776fd5 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.partitioning; + +import java.io.Serializable; + + +/** + * The {@code FieldSelectionKeySelector} simply extract a column value out from a row array {@link Object[]}. + */ +public class FieldSelectionKeySelector implements KeySelector<Object[], Object>, Serializable { + + private int _columnIndex; + + public FieldSelectionKeySelector(int columnIndex) { + _columnIndex = columnIndex; + } + + @Override + public Object getKey(Object[] input) { + return input[_columnIndex]; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java new file mode 100644 index 0000000000..79dc987d9d --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.partitioning; + +/** + * The {@code KeySelector} provides a partitioning function to encode a specific input data type into a key. + * + * <p>This key selector is used for computation such as GROUP BY or equality JOINs. + * + * <p>Key selector should always produce the same selection hash key when the same input is provided. + */ +public interface KeySelector<IN, OUT> { + + /** + * Extract the key out of an input data construct. + * + * @param input input data. + * @return the key of the input data. + */ + OUT getKey(IN input); +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerInstance.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerInstance.java new file mode 100644 index 0000000000..7ad6ee0b9c --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerInstance.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.routing; + +import java.util.Map; +import org.apache.helix.ZNRecord; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.spi.utils.CommonConstants; + + +/** + * WorkerInstance is a wrapper around {@link ServerInstance}. + * + * <p>This can be considered as a simplified version which directly enable host-port initialization. + */ +public class WorkerInstance extends ServerInstance { + + public WorkerInstance(InstanceConfig instanceConfig) { + super(instanceConfig); + } + + public WorkerInstance(String hostname, int serverPort, int mailboxPort) { + super(toInstanceConfig(hostname, serverPort, mailboxPort)); + } + + private static InstanceConfig toInstanceConfig(String hostname, int serverPort, int mailboxPort) { + String server = String.format("%s_%d", hostname, serverPort); + InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(server); + ZNRecord znRecord = instanceConfig.getRecord(); + Map<String, String> simpleFields = znRecord.getSimpleFields(); + simpleFields.put(CommonConstants.Helix.Instance.GRPC_PORT_KEY, String.valueOf(mailboxPort)); + return instanceConfig; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java new file mode 100644 index 0000000000..a3102580b6 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.routing; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.query.planner.StageMetadata; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.sql.parsers.CalciteSqlCompiler; + + +/** + * The {@code WorkerManager} manages stage to worker assignment. + * + * <p>It contains the logic to assign worker to a particular stages. If it is a leaf stage the logic fallback to + * how Pinot server assigned server and server-segment mapping. + * + * TODO: Currently it is implemented by wrapping routing manager from Pinot Broker. however we can abstract out + * the worker manager later when we split out the query-spi layer. + */ +public class WorkerManager { + private static final CalciteSqlCompiler CALCITE_SQL_COMPILER = new CalciteSqlCompiler(); + + private final String _hostName; + private final int _port; + private final RoutingManager _routingManager; + + public WorkerManager(String hostName, int port, RoutingManager routingManager) { + _hostName = hostName; + _port = port; + _routingManager = routingManager; + } + + public void assignWorkerToStage(String stageId, StageMetadata stageMetadata) { + List<String> scannedTables = stageMetadata.getScannedTables(); + if (scannedTables.size() == 1) { // table scan stage, need to attach server as well as segment info. + RoutingTable routingTable = getRoutingTable(scannedTables.get(0)); + Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = routingTable.getServerInstanceToSegmentsMap(); + stageMetadata.setServerInstances(new ArrayList<>(serverInstanceToSegmentsMap.keySet())); + stageMetadata.setServerInstanceToSegmentsMap(new HashMap<>(serverInstanceToSegmentsMap)); + } else if (stageId.equalsIgnoreCase("ROOT")) { + // ROOT stage doesn't have a QueryServer as it is strictly only reducing results. + // here we simply assign the worker instance with identical server/mailbox port number. + stageMetadata.setServerInstances(Lists.newArrayList(new WorkerInstance(_hostName, _port, _port))); + } else { + stageMetadata.setServerInstances(filterServers(_routingManager.getEnabledServerInstanceMap().values())); + } + } + + private static List<ServerInstance> filterServers(Collection<ServerInstance> servers) { + List<ServerInstance> serverInstances = new ArrayList<>(); + for (ServerInstance server : servers) { + String hostname = server.getHostname(); + if (!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) && !hostname.startsWith( + CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE) && !hostname.startsWith( + CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE) && server.getGrpcPort() > 0) { + serverInstances.add(server); + } + } + return serverInstances; + } + + private RoutingTable getRoutingTable(String tableName) { + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + // TODO: support both offline and realtime, now we hard code offline table. + String tableNameWithType = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName); + return _routingManager.getRoutingTable(CALCITE_SQL_COMPILER.compileToBrokerRequest( + "SELECT * FROM " + tableNameWithType)); + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java new file mode 100644 index 0000000000..2b35613f7a --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.rules; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.hep.HepRelVertex; +import org.apache.calcite.rel.RelDistributions; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Exchange; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalExchange; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.tools.RelBuilderFactory; + + +/** + * Special rule for Pinot, always insert exchange after JOIN + */ +public class PinotExchangeNodeInsertRule extends RelOptRule { + public static final PinotExchangeNodeInsertRule INSTANCE = + new PinotExchangeNodeInsertRule(RelFactories.LOGICAL_BUILDER); + + public PinotExchangeNodeInsertRule(RelBuilderFactory factory) { + super(operand(LogicalJoin.class, any()), factory, null); + } + + @Override + public boolean matches(RelOptRuleCall call) { + if (call.rels.length < 1) { + return false; + } + if (call.rel(0) instanceof Join) { + Join join = call.rel(0); + return !isExchange(join.getLeft()) && !isExchange(join.getRight()); + } + return false; + } + + @Override + public void onMatch(RelOptRuleCall call) { + Join join = call.rel(0); + RelNode leftInput = join.getInput(0); + RelNode rightInput = join.getInput(1); + + RelNode leftExchange = LogicalExchange.create(leftInput, RelDistributions.SINGLETON); + RelNode rightExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED); + + RelNode newJoinNode = + new LogicalJoin(join.getCluster(), join.getTraitSet(), leftExchange, rightExchange, join.getCondition(), + join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(), + ImmutableList.copyOf(join.getSystemFieldList())); + + call.transformTo(newJoinNode); + } + + private static boolean isExchange(RelNode rel) { + RelNode reference = rel; + if (reference instanceof HepRelVertex) { + reference = ((HepRelVertex) reference).getCurrentRel(); + } + return reference instanceof Exchange; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java new file mode 100644 index 0000000000..1b4e0850ac --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.rules; + +import java.util.Arrays; +import java.util.Collection; +import org.apache.calcite.adapter.enumerable.EnumerableRules; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.rel.rules.CoreRules; +import org.apache.calcite.rel.rules.PruneEmptyRules; + + +/** + * Default rule sets for Pinot query + */ +public class PinotQueryRuleSets { + private PinotQueryRuleSets() { + // do not instantiate. + } + + public static final Collection<RelOptRule> LOGICAL_OPT_RULES = + Arrays.asList(EnumerableRules.ENUMERABLE_FILTER_RULE, EnumerableRules.ENUMERABLE_JOIN_RULE, + EnumerableRules.ENUMERABLE_PROJECT_RULE, EnumerableRules.ENUMERABLE_SORT_RULE, + EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE, + + // push a filter into a join + CoreRules.FILTER_INTO_JOIN, + // push filter through an aggregation + CoreRules.FILTER_AGGREGATE_TRANSPOSE, + // push filter through set operation + CoreRules.FILTER_SET_OP_TRANSPOSE, + // push project through set operation + CoreRules.PROJECT_SET_OP_TRANSPOSE, + + // aggregation and projection rules + CoreRules.AGGREGATE_PROJECT_MERGE, CoreRules.AGGREGATE_PROJECT_PULL_UP_CONSTANTS, + // push a projection past a filter or vice versa + CoreRules.PROJECT_FILTER_TRANSPOSE, CoreRules.FILTER_PROJECT_TRANSPOSE, + // push a projection to the children of a join + // push all expressions to handle the time indicator correctly + CoreRules.JOIN_CONDITION_PUSH, + // merge projections + CoreRules.PROJECT_MERGE, + // remove identity project + CoreRules.PROJECT_REMOVE, + // reorder sort and projection + CoreRules.SORT_PROJECT_TRANSPOSE, + + // join rules + CoreRules.JOIN_PUSH_EXPRESSIONS, + + // convert non-all union into all-union + distinct + CoreRules.UNION_TO_DISTINCT, + + // remove aggregation if it does not aggregate and input is already distinct + CoreRules.AGGREGATE_REMOVE, + // push aggregate through join + CoreRules.AGGREGATE_JOIN_TRANSPOSE, + // aggregate union rule + CoreRules.AGGREGATE_UNION_AGGREGATE, + + // reduce aggregate functions like AVG, STDDEV_POP etc. + CoreRules.AGGREGATE_REDUCE_FUNCTIONS, + + // remove unnecessary sort rule + CoreRules.SORT_REMOVE, + + // prune empty results rules + PruneEmptyRules.AGGREGATE_INSTANCE, PruneEmptyRules.FILTER_INSTANCE, PruneEmptyRules.JOIN_LEFT_INSTANCE, + PruneEmptyRules.JOIN_RIGHT_INSTANCE, PruneEmptyRules.PROJECT_INSTANCE, PruneEmptyRules.SORT_INSTANCE, + PruneEmptyRules.UNION_INSTANCE, + + // Pinot specific rules + PinotExchangeNodeInsertRule.INSTANCE); +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java new file mode 100644 index 0000000000..55797b0e52 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.type; + +import java.util.Map; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; + + +/** + * Extends Java-base TypeFactory from Calcite. + * + * <p>{@link JavaTypeFactoryImpl} is used here because we are not overriding much of the TypeFactory methods + * required by Calcite. We will start extending {@link SqlTypeFactoryImpl} or even {@link RelDataTypeFactory} + * when necessary for Pinot to override such mechanism. + * + * <p>Noted that {@link JavaTypeFactoryImpl} is subject to change. Please pay extra attention to this class when + * upgrading Calcite versions. + */ +public class TypeFactory extends JavaTypeFactoryImpl { + private final RelDataTypeSystem _typeSystem; + + public TypeFactory(RelDataTypeSystem typeSystem) { + _typeSystem = typeSystem; + } + + public RelDataType createRelDataTypeFromSchema(Schema schema) { + Builder builder = new Builder(this); + for (Map.Entry<String, FieldSpec> e : schema.getFieldSpecMap().entrySet()) { + builder.add(e.getKey(), toRelDataType(e.getValue())); + } + return builder.build(); + } + + private RelDataType toRelDataType(FieldSpec fieldSpec) { + switch (fieldSpec.getDataType()) { + case INT: + return createSqlType(SqlTypeName.INTEGER); + case LONG: + return createSqlType(SqlTypeName.BIGINT); + case FLOAT: + return createSqlType(SqlTypeName.FLOAT); + case DOUBLE: + return createSqlType(SqlTypeName.DOUBLE); + case BOOLEAN: + return createSqlType(SqlTypeName.BOOLEAN); + case TIMESTAMP: + return createSqlType(SqlTypeName.TIMESTAMP); + case STRING: + return createSqlType(SqlTypeName.VARCHAR); + case BYTES: + return createSqlType(SqlTypeName.VARBINARY); + case JSON: + // TODO: support JSON, JSON should be supported using a special RelDataType as it is not a simple String, + // nor can it be easily parsed as a STRUCT. + case LIST: + // TODO: support LIST, MV column should go fall into this category. + case STRUCT: + case MAP: + default: + throw new UnsupportedOperationException("unsupported!"); + } + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java new file mode 100644 index 0000000000..b2606d9296 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.type; + +import org.apache.calcite.rel.type.RelDataTypeSystemImpl; + + +/** + * The {@code TypeSystem} overwrites Calcite type system with Pinot specific logics. + * + * TODO: no overwrite for now. + */ +public class TypeSystem extends RelDataTypeSystemImpl { +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/validate/Validator.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/validate/Validator.java new file mode 100644 index 0000000000..f774976c21 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/validate/Validator.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.validate; + +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.validate.SqlValidatorCatalogReader; +import org.apache.calcite.sql.validate.SqlValidatorImpl; + + +/** + * The {@code Validator} overwrites Calcite's Validator with Pinot specific logics. + */ +public class Validator extends SqlValidatorImpl { + + public Validator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader, RelDataTypeFactory typeFactory) { + // TODO: support BABEL validator. Currently parser conformance is set to use BABEL. + super(opTab, catalogReader, typeFactory, Config.DEFAULT); + } +} diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java new file mode 100644 index 0000000000..911469644b --- /dev/null +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query; + +import com.google.common.collect.ImmutableList; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.calcite.jdbc.CalciteSchemaBuilder; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.externalize.RelXmlWriter; +import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.calcite.sql.SqlNode; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.query.catalog.PinotCatalog; +import org.apache.pinot.query.context.PlannerContext; +import org.apache.pinot.query.planner.QueryPlan; +import org.apache.pinot.query.planner.StageMetadata; +import org.apache.pinot.query.routing.WorkerManager; +import org.apache.pinot.query.type.TypeFactory; +import org.apache.pinot.query.type.TypeSystem; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class QueryEnvironmentTest { + private QueryEnvironment _queryEnvironment; + + @BeforeClass + public void setUp() { + // the port doesn't matter as we are not actually making a server call. + RoutingManager routingManager = QueryEnvironmentTestUtils.getMockRoutingManager(1, 2); + _queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()), + CalciteSchemaBuilder.asRootSchema(new PinotCatalog(QueryEnvironmentTestUtils.mockTableCache())), + new WorkerManager("localhost", 3, routingManager)); + } + + @Test + public void testSqlStrings() + throws Exception { + testQueryParsing("SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0", + "SELECT *\n" + "FROM `a`\n" + "INNER JOIN `b` ON `a`.`col1` = `b`.`col2`\n" + "WHERE `a`.`col3` >= 0"); + } + + @Test + public void testQueryToStages() + throws Exception { + PlannerContext plannerContext = new PlannerContext(); + String query = "SELECT * FROM a JOIN b ON a.col1 = b.col2"; + QueryPlan queryPlan = _queryEnvironment.planQuery(query); + Assert.assertEquals(queryPlan.getQueryStageMap().size(), 4); + Assert.assertEquals(queryPlan.getStageMetadataMap().size(), 4); + for (Map.Entry<String, StageMetadata> e : queryPlan.getStageMetadataMap().entrySet()) { + List<String> tables = e.getValue().getScannedTables(); + if (tables.size() == 1) { + // table scan stages; for tableA it should have 2 hosts, for tableB it should have only 1 + Assert.assertEquals( + e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()), + tables.get(0).equals("a") ? ImmutableList.of("Server_localhost_1", "Server_localhost_2") + : ImmutableList.of("Server_localhost_1")); + } else if (!e.getKey().equals("ROOT")) { + // join stage should have both servers used. + Assert.assertEquals( + e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()), + ImmutableList.of("Server_localhost_1", "Server_localhost_2")); + } else { + // reduce stage should have the reducer instance. + Assert.assertEquals( + e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()), + ImmutableList.of("Server_localhost_3")); + } + } + } + + @Test + public void testQueryToRel() + throws Exception { + PlannerContext plannerContext = new PlannerContext(); + String query = "SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0"; + SqlNode parsed = _queryEnvironment.parse(query, plannerContext); + SqlNode validated = _queryEnvironment.validate(parsed); + RelRoot relRoot = _queryEnvironment.toRelation(validated, plannerContext); + RelNode optimized = _queryEnvironment.optimize(relRoot, plannerContext); + + // Assert that relational plan can be written into a ALL-ATTRIBUTE digest. + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + RelWriter planWriter = new RelXmlWriter(pw, SqlExplainLevel.ALL_ATTRIBUTES); + optimized.explain(planWriter); + Assert.assertNotNull(sw.toString()); + } + + private void testQueryParsing(String query, String digest) + throws Exception { + PlannerContext plannerContext = new PlannerContext(); + SqlNode sqlNode = _queryEnvironment.parse(query, plannerContext); + _queryEnvironment.validate(sqlNode); + Assert.assertEquals(sqlNode.toString(), digest); + } +} diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java new file mode 100644 index 0000000000..cc0db385db --- /dev/null +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.calcite.jdbc.CalciteSchemaBuilder; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.query.catalog.PinotCatalog; +import org.apache.pinot.query.planner.QueryPlan; +import org.apache.pinot.query.routing.WorkerInstance; +import org.apache.pinot.query.routing.WorkerManager; +import org.apache.pinot.query.type.TypeFactory; +import org.apache.pinot.query.type.TypeSystem; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +/** + * Base query environment test that provides a bunch of mock tables / schemas so that + * we can run a simple query planning, produce stages / metadata for other components to test. + */ +public class QueryEnvironmentTestUtils { + public static final Schema.SchemaBuilder SCHEMA_BUILDER; + public static final Map<String, List<String>> SERVER1_SEGMENTS = + ImmutableMap.of("a", Lists.newArrayList("a1", "a2"), "b", Lists.newArrayList("b1"), "c", + Lists.newArrayList("c1")); + public static final Map<String, List<String>> SERVER2_SEGMENTS = + ImmutableMap.of("a", Lists.newArrayList("a3"), "c", Lists.newArrayList("c2", "c3")); + + static { + SCHEMA_BUILDER = new Schema.SchemaBuilder().addSingleValueDimension("col1", FieldSpec.DataType.STRING, "") + .addSingleValueDimension("col2", FieldSpec.DataType.STRING, "") + .addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS") + .addMetric("col3", FieldSpec.DataType.INT, 0); + } + + private QueryEnvironmentTestUtils() { + // do not instantiate. + } + + public static TableCache mockTableCache() { + TableCache mock = mock(TableCache.class); + when(mock.getTableNameMap()).thenReturn(ImmutableMap.of("a", "a", "b", "b", "c", "c")); + when(mock.getSchema("a")).thenReturn(SCHEMA_BUILDER.setSchemaName("a").build()); + when(mock.getSchema("b")).thenReturn(SCHEMA_BUILDER.setSchemaName("b").build()); + when(mock.getSchema("c")).thenReturn(SCHEMA_BUILDER.setSchemaName("c").build()); + return mock; + } + + public static QueryEnvironment getQueryEnvironment(int reducerPort, int port1, int port2) { + RoutingManager routingManager = QueryEnvironmentTestUtils.getMockRoutingManager(port1, port2); + return new QueryEnvironment(new TypeFactory(new TypeSystem()), + CalciteSchemaBuilder.asRootSchema(new PinotCatalog(QueryEnvironmentTestUtils.mockTableCache())), + new WorkerManager("localhost", reducerPort, routingManager)); + } + + public static RoutingManager getMockRoutingManager(int port1, int port2) { + String server1 = String.format("localhost_%d", port1); + String server2 = String.format("localhost_%d", port2); + // this doesn't test the QueryServer functionality so the server port can be the same as the mailbox port. + // this is only use for test identifier purpose. + ServerInstance host1 = new WorkerInstance("localhost", port1, port1); + ServerInstance host2 = new WorkerInstance("localhost", port2, port2); + + RoutingTable rtA = mock(RoutingTable.class); + when(rtA.getServerInstanceToSegmentsMap()).thenReturn( + ImmutableMap.of(host1, SERVER1_SEGMENTS.get("a"), host2, SERVER2_SEGMENTS.get("a"))); + RoutingTable rtB = mock(RoutingTable.class); + when(rtB.getServerInstanceToSegmentsMap()).thenReturn(ImmutableMap.of(host1, SERVER1_SEGMENTS.get("b"))); + RoutingTable rtC = mock(RoutingTable.class); + when(rtC.getServerInstanceToSegmentsMap()).thenReturn( + ImmutableMap.of(host1, SERVER1_SEGMENTS.get("c"), host2, SERVER2_SEGMENTS.get("c"))); + Map<String, RoutingTable> mockRoutingTableMap = ImmutableMap.of("a", rtA, "b", rtB, "c", rtC); + RoutingManager mock = mock(RoutingManager.class); + when(mock.getRoutingTable(any())).thenAnswer(invocation -> { + BrokerRequest brokerRequest = invocation.getArgument(0); + String tableName = brokerRequest.getPinotQuery().getDataSource().getTableName(); + return mockRoutingTableMap.get(TableNameBuilder.extractRawTableName(tableName)); + }); + when(mock.getEnabledServerInstanceMap()).thenReturn(ImmutableMap.of(server1, host1, server2, host2)); + return mock; + } + + public static String getTestStageByServerCount(QueryPlan queryPlan, int serverCount) { + List<String> stageIds = queryPlan.getStageMetadataMap().entrySet().stream() + .filter(e -> !e.getKey().equals("ROOT") && e.getValue().getServerInstances().size() == serverCount) + .map(Map.Entry::getKey).collect(Collectors.toList()); + return stageIds.size() > 0 ? stageIds.get(0) : null; + } + + public static int getAvailablePort() { + try { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + } catch (IOException e) { + throw new RuntimeException("Failed to find an available port to use", e); + } + } +} diff --git a/pom.xml b/pom.xml index 8fa7ab76c1..56e6ade7e9 100644 --- a/pom.xml +++ b/pom.xml @@ -57,6 +57,8 @@ <module>pinot-segment-local</module> <module>pinot-compatibility-verifier</module> <module>contrib/pinot-fmpp-maven-plugin</module> + + <module>pinot-query-planner</module> </modules> <licenses> @@ -966,10 +968,6 @@ <groupId>org.apache.commons</groupId> <artifactId>commons-dbcp2</artifactId> </exclusion> - <exclusion> - <groupId>com.esri.geometry</groupId> - <artifactId>esri-geometry-api</artifactId> - </exclusion> <exclusion> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-yaml</artifactId> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org